generate rss feed; Fixes #4
This commit is contained in:
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -820,6 +820,7 @@ dependencies = [
|
|||||||
"axum",
|
"axum",
|
||||||
"bytes",
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
|
"quick-xml",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"stream-download",
|
"stream-download",
|
||||||
@@ -855,6 +856,15 @@ dependencies = [
|
|||||||
"unicode-ident",
|
"unicode-ident",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "quick-xml"
|
||||||
|
version = "0.38.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89"
|
||||||
|
dependencies = [
|
||||||
|
"memchr",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "quinn"
|
name = "quinn"
|
||||||
version = "0.11.8"
|
version = "0.11.8"
|
||||||
|
@@ -14,3 +14,4 @@ serde_json = "1"
|
|||||||
stream-download = "0.22"
|
stream-download = "0.22"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
tokio-cron-scheduler = "0.14"
|
tokio-cron-scheduler = "0.14"
|
||||||
|
quick-xml = "0.38"
|
||||||
|
21
src/main.rs
21
src/main.rs
@@ -2,10 +2,8 @@ mod state;
|
|||||||
mod streamer;
|
mod streamer;
|
||||||
|
|
||||||
use axum::{routing::get, Router};
|
use axum::{routing::get, Router};
|
||||||
use chrono::Utc;
|
|
||||||
use state::AppState;
|
use state::AppState;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio_cron_scheduler::{Job, JobScheduler};
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
@@ -15,25 +13,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
.route("/", get(streamer::stream_handler))
|
.route("/", get(streamer::stream_handler))
|
||||||
.with_state(state.clone());
|
.with_state(state.clone());
|
||||||
|
|
||||||
//let scheduler = JobScheduler::new().await.unwrap();
|
|
||||||
//scheduler
|
|
||||||
// .add(
|
|
||||||
// Job::new_async(
|
|
||||||
// "30 0 7 * * Mon,Tue,Wed,Thu,Fri,Sat",
|
|
||||||
// move |_uuid, _locked| {
|
|
||||||
// let state_for_task = state.clone();
|
|
||||||
// Box::pin(async move {
|
|
||||||
// state_for_task.check_update().await;
|
|
||||||
// println!("Task executed at: {}", Utc::now());
|
|
||||||
// })
|
|
||||||
// },
|
|
||||||
// )
|
|
||||||
// .unwrap(),
|
|
||||||
// )
|
|
||||||
// .await
|
|
||||||
// .unwrap();
|
|
||||||
//scheduler.start().await.unwrap();
|
|
||||||
|
|
||||||
println!("Streaming server running on http://localhost:3029");
|
println!("Streaming server running on http://localhost:3029");
|
||||||
|
|
||||||
let listener = tokio::net::TcpListener::bind("0.0.0.0:3029").await?;
|
let listener = tokio::net::TcpListener::bind("0.0.0.0:3029").await?;
|
||||||
|
133
src/state.rs
133
src/state.rs
@@ -1,143 +1,38 @@
|
|||||||
use bytes::Bytes;
|
|
||||||
use chrono::{Local, NaiveDate};
|
use chrono::{Local, NaiveDate};
|
||||||
use std::{io::Read, sync::Arc, time::Duration};
|
use std::sync::Arc;
|
||||||
use stream_download::{
|
use tokio::sync::RwLock;
|
||||||
source::DecodeError, storage::temp::TempStorageProvider, Settings, StreamDownload,
|
|
||||||
};
|
|
||||||
use tokio::sync::{mpsc, Notify, RwLock};
|
|
||||||
|
|
||||||
pub struct AppState {
|
pub struct AppState {
|
||||||
pub chunks: RwLock<Vec<Bytes>>,
|
pub urls: RwLock<Vec<String>>,
|
||||||
pub complete: RwLock<bool>,
|
pub last_download_on_day: RwLock<Option<NaiveDate>>,
|
||||||
pub notify: Notify,
|
|
||||||
pub downloaded_on_day: RwLock<Option<NaiveDate>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AppState {
|
impl AppState {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
chunks: RwLock::new(Vec::new()),
|
urls: RwLock::new(Vec::new()),
|
||||||
complete: RwLock::new(false),
|
last_download_on_day: RwLock::new(None),
|
||||||
notify: Notify::new(),
|
|
||||||
downloaded_on_day: RwLock::new(None),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn check_update(self: Arc<Self>) {
|
pub async fn check_update(self: Arc<Self>) {
|
||||||
let today = Local::now().date_naive();
|
let today = Local::now().date_naive();
|
||||||
if let Some(downloaded_on_day) = *self.downloaded_on_day.read().await
|
if let Some(downloaded_on_day) = *self.last_download_on_day.read().await
|
||||||
&& today == downloaded_on_day {
|
&& today == downloaded_on_day
|
||||||
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
self.reset().await;
|
*self.last_download_on_day.write().await = Some(today);
|
||||||
*self.downloaded_on_day.write().await = Some(today);
|
|
||||||
|
|
||||||
let latest_url = player::newest_morning_journal_streaming_url()
|
let latest_url = player::newest_morning_journal_streaming_url()
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let self_clone = Arc::clone(&self);
|
let mut old = self.urls.read().await.clone();
|
||||||
tokio::spawn(async move {
|
old.push(latest_url);
|
||||||
if let Err(e) = self_clone.download_stream(&latest_url).await {
|
let new = old.into_iter().rev().take(10).collect(); // only keep last 10
|
||||||
eprintln!("Download failed: {e:?}");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn download_stream(&self, url: &str) -> Result<(), Box<dyn std::error::Error>> {
|
*self.urls.write().await = new;
|
||||||
println!("Starting download from: {url}");
|
|
||||||
|
|
||||||
let reader = match StreamDownload::new_http(
|
|
||||||
url.parse()?,
|
|
||||||
TempStorageProvider::new(),
|
|
||||||
Settings::default(),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(reader) => reader,
|
|
||||||
Err(e) => return Err(e.decode_error().await.into()),
|
|
||||||
};
|
|
||||||
|
|
||||||
let (tx, mut rx) = mpsc::unbounded_channel::<bytes::Bytes>();
|
|
||||||
|
|
||||||
// Spawn blocking task to read from the stream
|
|
||||||
let read_handle = tokio::task::spawn_blocking(
|
|
||||||
move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|
||||||
let mut reader = reader;
|
|
||||||
let mut buffer = vec![0u8; 8192]; // 8KB buffer
|
|
||||||
|
|
||||||
loop {
|
|
||||||
match reader.read(&mut buffer) {
|
|
||||||
Ok(0) => {
|
|
||||||
// End of stream
|
|
||||||
println!("End of stream reached");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Ok(n) => {
|
|
||||||
let chunk = bytes::Bytes::copy_from_slice(&buffer[..n]);
|
|
||||||
if tx.send(chunk).is_err() {
|
|
||||||
// Receiver dropped, exit
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
println!("Error reading from stream: {e:?}");
|
|
||||||
// Short delay before retrying
|
|
||||||
std::thread::sleep(Duration::from_millis(100));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
// Process received chunks
|
|
||||||
while let Some(bytes) = rx.recv().await {
|
|
||||||
self.add_chunk(bytes).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for the reading task to complete
|
|
||||||
match read_handle.await {
|
|
||||||
Ok(result) => result.unwrap(),
|
|
||||||
Err(e) => return Err(e.into()),
|
|
||||||
}
|
|
||||||
|
|
||||||
self.mark_complete().await;
|
|
||||||
println!("Download complete!");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn reset(&self) {
|
|
||||||
// Clear all chunks
|
|
||||||
self.chunks.write().await.clear();
|
|
||||||
|
|
||||||
// Reset completion status
|
|
||||||
*self.complete.write().await = false;
|
|
||||||
|
|
||||||
*self.downloaded_on_day.write().await = None;
|
|
||||||
|
|
||||||
// Notify any waiting tasks about the reset
|
|
||||||
self.notify.notify_waiters();
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn add_chunk(&self, chunk: Bytes) {
|
|
||||||
self.chunks.write().await.push(chunk);
|
|
||||||
self.notify.notify_waiters();
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn mark_complete(&self) {
|
|
||||||
*self.complete.write().await = true;
|
|
||||||
self.notify.notify_waiters();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn is_complete(&self) -> bool {
|
|
||||||
*self.complete.read().await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn chunk_count(&self) -> usize {
|
|
||||||
self.chunks.read().await.len()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,56 +1,41 @@
|
|||||||
use crate::state::AppState;
|
use crate::state::AppState;
|
||||||
use axum::{
|
use axum::{extract::State, http::HeaderMap, response::IntoResponse};
|
||||||
body::Body,
|
use reqwest::header;
|
||||||
extract::State,
|
|
||||||
response::{Redirect, Response},
|
|
||||||
};
|
|
||||||
use player::newest_morning_journal_streaming_url;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio_stream::Stream;
|
|
||||||
|
|
||||||
pub async fn stream_handler(State(state): State<Arc<AppState>>) -> Redirect {
|
pub async fn stream_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
|
||||||
let url = newest_morning_journal_streaming_url().await.unwrap();
|
state.clone().check_update().await;
|
||||||
Redirect::temporary(&url)
|
|
||||||
|
|
||||||
//state.clone().check_update().await;
|
let content = feed(&state.urls.read().await.to_vec());
|
||||||
|
|
||||||
//let stream = create_chunk_stream(state);
|
let mut headers = HeaderMap::new();
|
||||||
//let body = Body::from_stream(stream);
|
headers.insert(header::CONTENT_TYPE, "application/rss+xml".parse().unwrap());
|
||||||
|
(headers, content)
|
||||||
//Response::builder()
|
|
||||||
// .header("Content-Type", "audio/mpeg")
|
|
||||||
// .header("Cache-Control", "no-cache, no-store, must-revalidate")
|
|
||||||
// .header("Pragma", "no-cache")
|
|
||||||
// .header("Expires", "0")
|
|
||||||
// .header("Accept-Ranges", "none")
|
|
||||||
// .header("Transfer-Encoding", "chunked")
|
|
||||||
// .header("X-Content-Duration", "infinity")
|
|
||||||
// .body(body)
|
|
||||||
// .unwrap()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_chunk_stream(
|
fn feed(urls: &Vec<String>) -> String {
|
||||||
state: Arc<AppState>,
|
let mut ret = String::new();
|
||||||
) -> impl Stream<Item = Result<bytes::Bytes, std::io::Error>> {
|
ret.push_str(r#"<?xml version="1.0" encoding="UTF-8"?>"#);
|
||||||
async_stream::stream! {
|
ret.push_str(r#"<rss version="2.0">"#);
|
||||||
let mut position = 0;
|
ret.push_str("<channel>");
|
||||||
|
ret.push_str("<title>Ö1 Morgenjournal Feed</title>");
|
||||||
|
ret.push_str("<link>https://news.hofer.link</link>");
|
||||||
|
ret.push_str("<description>Feed für Ö1 Morgenjournal. Live.</description>");
|
||||||
|
|
||||||
loop {
|
for url in urls {
|
||||||
// Send available chunks
|
ret.push_str("<item>");
|
||||||
let chunks = state.chunks.read().await;
|
ret.push_str(&format!("<title>Morgenjournal</title>"));
|
||||||
while position < chunks.len() {
|
ret.push_str(&format!("<link>{}</link>", quick_xml::escape::escape(url)));
|
||||||
yield Ok(chunks[position].clone());
|
ret.push_str(&format!(
|
||||||
position += 1;
|
"<enclosure url=\"{}\" length=\"0\" type=\"audio/mpeg\"/>\n",
|
||||||
}
|
quick_xml::escape::escape(url)
|
||||||
drop(chunks);
|
));
|
||||||
|
ret.push_str(&format!("<description>Morgenjournal</description>"));
|
||||||
// Exit if download complete and all chunks sent
|
ret.push_str("</item>");
|
||||||
if state.is_complete().await && position >= state.chunk_count().await {
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for new chunks
|
ret.push_str(" </channel>");
|
||||||
state.notify.notified().await;
|
ret.push_str("</rss>");
|
||||||
}
|
|
||||||
}
|
ret
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user