switch to external downloader; Fixes #1 (hopefully...)
All checks were successful
CI/CD Pipeline / test (push) Successful in 5m31s
CI/CD Pipeline / deploy (push) Successful in 9m2s

This commit is contained in:
2025-07-30 20:55:01 +02:00
parent a7ab1e8360
commit 45e5c1954d
4 changed files with 333 additions and 93 deletions

View File

@@ -1,7 +1,9 @@
use crate::state::AppState;
use std::{sync::Arc, time::Duration};
use tokio::time::timeout;
use tokio_stream::StreamExt;
use std::{io::Read, sync::Arc, time::Duration};
use stream_download::{
source::DecodeError, storage::temp::TempStorageProvider, Settings, StreamDownload,
};
use tokio::sync::mpsc;
pub async fn spawn_download_task(url: &str, state: Arc<AppState>) {
state.reset().await;
@@ -14,21 +16,66 @@ pub async fn spawn_download_task(url: &str, state: Arc<AppState>) {
});
}
async fn download_stream(url: &str, state: Arc<AppState>) -> Result<(), reqwest::Error> {
async fn download_stream(
url: &str,
state: Arc<AppState>,
) -> Result<(), Box<dyn std::error::Error>> {
println!("Starting download from: {url}");
let response = reqwest::Client::new().get(url).send().await?;
let mut stream = response.bytes_stream();
let reader = match StreamDownload::new_http(
url.parse()?,
TempStorageProvider::new(),
Settings::default(),
)
.await
{
Ok(reader) => reader,
Err(e) => return Err(e.decode_error().await.into()),
};
loop {
let Ok(bytes) = timeout(Duration::from_secs(5), stream.next()).await else {
println!("Error fetching stream, trying again...");
continue;
};
let (tx, mut rx) = mpsc::unbounded_channel::<bytes::Bytes>();
if handle_bytes(bytes, state.clone()).await {
break;
}
// Spawn blocking task to read from the stream
let read_handle = tokio::task::spawn_blocking(
move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut reader = reader;
let mut buffer = vec![0u8; 8192]; // 8KB buffer
loop {
match reader.read(&mut buffer) {
Ok(0) => {
// End of stream
println!("End of stream reached");
break;
}
Ok(n) => {
let chunk = bytes::Bytes::copy_from_slice(&buffer[..n]);
if tx.send(chunk).is_err() {
// Receiver dropped, exit
break;
}
}
Err(e) => {
println!("Error reading from stream: {e:?}");
// Short delay before retrying
std::thread::sleep(Duration::from_millis(100));
}
}
}
Ok(())
},
);
// Process received chunks
while let Some(bytes) = rx.recv().await {
state.add_chunk(bytes).await;
}
// Wait for the reading task to complete
match read_handle.await {
Ok(result) => result.unwrap(),
Err(e) => return Err(e.into()),
}
state.mark_complete().await;
@@ -36,25 +83,3 @@ async fn download_stream(url: &str, state: Arc<AppState>) -> Result<(), reqwest:
Ok(())
}
/// Returns if the end (= no more bytes) has been received
async fn handle_bytes(
bytes: Option<Result<bytes::Bytes, reqwest::Error>>,
state: Arc<AppState>,
) -> bool {
// TODO: switch checks, such that `.transpose()` is not needed anymore
let bytes = match bytes.transpose() {
Ok(bytes) => bytes,
Err(e) => {
println!("Error fetching chunk from stream: {e:?}");
return false;
}
};
if let Some(bytes) = bytes {
state.add_chunk(bytes).await;
false
} else {
true
}
}

View File

@@ -35,6 +35,7 @@ async fn get_streaming_url(url: String) -> Result<String, Box<dyn std::error::Er
return Err(String::from("No 'streams' found").into());
};
assert_eq!(streams.len(), 1);
let Some(id) = streams[0]["loopStreamId"].as_str() else {
return Err(String::from("No 'loopStreamId' found").into());
};