remove need to call /new
All checks were successful
CI/CD Pipeline / test (push) Successful in 2m24s
CI/CD Pipeline / deploy (push) Successful in 2m8s

This commit is contained in:
2025-08-01 19:04:25 +02:00
parent 12d02b286d
commit 67fb02894f
6 changed files with 227 additions and 102 deletions

View File

@@ -1,85 +0,0 @@
use crate::state::AppState;
use std::{io::Read, sync::Arc, time::Duration};
use stream_download::{
Settings, StreamDownload, source::DecodeError, storage::temp::TempStorageProvider,
};
use tokio::sync::mpsc;
pub async fn spawn_download_task(url: &str, state: Arc<AppState>) {
state.reset().await;
let url = url.to_string();
tokio::spawn(async move {
if let Err(e) = download_stream(&url, state).await {
eprintln!("Download failed: {e:?}");
}
});
}
async fn download_stream(
url: &str,
state: Arc<AppState>,
) -> Result<(), Box<dyn std::error::Error>> {
println!("Starting download from: {url}");
let reader = match StreamDownload::new_http(
url.parse()?,
TempStorageProvider::new(),
Settings::default(),
)
.await
{
Ok(reader) => reader,
Err(e) => return Err(e.decode_error().await.into()),
};
let (tx, mut rx) = mpsc::unbounded_channel::<bytes::Bytes>();
// Spawn blocking task to read from the stream
let read_handle = tokio::task::spawn_blocking(
move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut reader = reader;
let mut buffer = vec![0u8; 8192]; // 8KB buffer
loop {
match reader.read(&mut buffer) {
Ok(0) => {
// End of stream
println!("End of stream reached");
break;
}
Ok(n) => {
let chunk = bytes::Bytes::copy_from_slice(&buffer[..n]);
if tx.send(chunk).is_err() {
// Receiver dropped, exit
break;
}
}
Err(e) => {
println!("Error reading from stream: {e:?}");
// Short delay before retrying
std::thread::sleep(Duration::from_millis(100));
}
}
}
Ok(())
},
);
// Process received chunks
while let Some(bytes) = rx.recv().await {
state.add_chunk(bytes).await;
}
// Wait for the reading task to complete
match read_handle.await {
Ok(result) => result.unwrap(),
Err(e) => return Err(e.into()),
}
state.mark_complete().await;
println!("Download complete!");
Ok(())
}

View File

@@ -1,28 +1,16 @@
mod downloader;
mod state;
mod streamer;
use axum::{Router, extract::State, routing::get};
use axum::{routing::get, Router};
use state::AppState;
use std::sync::Arc;
async fn new(State(state): State<Arc<AppState>>) -> &'static str {
let Ok(url) = player::newest_morning_journal_streaming_url().await else {
return "Failed getting latest url";
};
downloader::spawn_download_task(&url, state.clone()).await;
"Download started. Access / to stream the new content."
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let state = Arc::new(AppState::new());
let app = Router::new()
.route("/", get(streamer::stream_handler))
.route("/new", get(new))
.with_state(state);
println!("Streaming server running on http://localhost:3029");

View File

@@ -1,10 +1,16 @@
use bytes::Bytes;
use tokio::sync::{Notify, RwLock};
use chrono::{Local, NaiveDate};
use std::{io::Read, sync::Arc, time::Duration};
use stream_download::{
source::DecodeError, storage::temp::TempStorageProvider, Settings, StreamDownload,
};
use tokio::sync::{mpsc, Notify, RwLock};
pub struct AppState {
pub chunks: RwLock<Vec<Bytes>>,
pub complete: RwLock<bool>,
pub notify: Notify,
pub downloaded_on_day: RwLock<Option<NaiveDate>>,
}
impl AppState {
@@ -13,26 +19,117 @@ impl AppState {
chunks: RwLock::new(Vec::new()),
complete: RwLock::new(false),
notify: Notify::new(),
downloaded_on_day: RwLock::new(None),
}
}
pub async fn reset(&self) {
pub async fn check_update(self: Arc<Self>) {
let today = Local::now().date_naive();
if let Some(downloaded_on_day) = *self.downloaded_on_day.read().await {
if today == downloaded_on_day {
return;
}
}
self.reset().await;
*self.downloaded_on_day.write().await = Some(today);
let latest_url = player::newest_morning_journal_streaming_url()
.await
.unwrap();
let self_clone = Arc::clone(&self);
tokio::spawn(async move {
if let Err(e) = self_clone.download_stream(&latest_url).await {
eprintln!("Download failed: {e:?}");
}
});
}
async fn download_stream(&self, url: &str) -> Result<(), Box<dyn std::error::Error>> {
println!("Starting download from: {url}");
let reader = match StreamDownload::new_http(
url.parse()?,
TempStorageProvider::new(),
Settings::default(),
)
.await
{
Ok(reader) => reader,
Err(e) => return Err(e.decode_error().await.into()),
};
let (tx, mut rx) = mpsc::unbounded_channel::<bytes::Bytes>();
// Spawn blocking task to read from the stream
let read_handle = tokio::task::spawn_blocking(
move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut reader = reader;
let mut buffer = vec![0u8; 8192]; // 8KB buffer
loop {
match reader.read(&mut buffer) {
Ok(0) => {
// End of stream
println!("End of stream reached");
break;
}
Ok(n) => {
let chunk = bytes::Bytes::copy_from_slice(&buffer[..n]);
if tx.send(chunk).is_err() {
// Receiver dropped, exit
break;
}
}
Err(e) => {
println!("Error reading from stream: {e:?}");
// Short delay before retrying
std::thread::sleep(Duration::from_millis(100));
}
}
}
Ok(())
},
);
// Process received chunks
while let Some(bytes) = rx.recv().await {
self.add_chunk(bytes).await;
}
// Wait for the reading task to complete
match read_handle.await {
Ok(result) => result.unwrap(),
Err(e) => return Err(e.into()),
}
self.mark_complete().await;
println!("Download complete!");
Ok(())
}
async fn reset(&self) {
// Clear all chunks
self.chunks.write().await.clear();
// Reset completion status
*self.complete.write().await = false;
*self.downloaded_on_day.write().await = None;
// Notify any waiting tasks about the reset
self.notify.notify_waiters();
}
pub async fn add_chunk(&self, chunk: Bytes) {
async fn add_chunk(&self, chunk: Bytes) {
self.chunks.write().await.push(chunk);
self.notify.notify_waiters();
}
pub async fn mark_complete(&self) {
async fn mark_complete(&self) {
*self.complete.write().await = true;
self.notify.notify_waiters();
}

View File

@@ -4,6 +4,8 @@ use std::sync::Arc;
use tokio_stream::Stream;
pub async fn stream_handler(State(state): State<Arc<AppState>>) -> Response {
state.clone().check_update().await;
let stream = create_chunk_stream(state);
let body = Body::from_stream(stream);