retry failed chunk downloads; Continues #1
This commit is contained in:
13
Cargo.lock
generated
13
Cargo.lock
generated
@@ -664,6 +664,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -1220,9 +1221,21 @@ checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
|
||||
dependencies = [
|
||||
"log",
|
||||
"pin-project-lite",
|
||||
"tracing-attributes",
|
||||
"tracing-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-attributes"
|
||||
version = "0.1.30"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tracing-core"
|
||||
version = "0.1.34"
|
||||
|
@@ -11,3 +11,4 @@ reqwest = { version = "0.12", features = ["stream", "json", "rustls-tls"], defau
|
||||
bytes = "1"
|
||||
async-stream = "0.3"
|
||||
serde_json = "1"
|
||||
tracing = "0.1"
|
||||
|
@@ -1,5 +1,6 @@
|
||||
use crate::state::AppState;
|
||||
use std::sync::Arc;
|
||||
use std::{sync::Arc, time::Duration};
|
||||
use tokio::time::timeout;
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
pub async fn spawn_download_task(url: &str, state: Arc<AppState>) {
|
||||
@@ -18,22 +19,42 @@ async fn download_stream(url: &str, state: Arc<AppState>) -> Result<(), reqwest:
|
||||
|
||||
let response = reqwest::Client::new().get(url).send().await?;
|
||||
let mut stream = response.bytes_stream();
|
||||
let mut total_size = 0;
|
||||
|
||||
while let Some(chunk) = stream.next().await {
|
||||
let bytes = chunk?;
|
||||
total_size += bytes.len();
|
||||
loop {
|
||||
let Ok(bytes) = timeout(Duration::from_secs(5), stream.next()).await else {
|
||||
println!("Error fetching stream, trying again...");
|
||||
continue;
|
||||
};
|
||||
|
||||
state.add_chunk(bytes).await;
|
||||
|
||||
if total_size % (1024 * 1024) == 0 {
|
||||
// Log every MB
|
||||
println!("Downloaded: {} MB", total_size / (1024 * 1024));
|
||||
if handle_bytes(bytes, state.clone()).await {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
state.mark_complete().await;
|
||||
println!("Download complete! Total: {total_size} bytes");
|
||||
println!("Download complete!");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns if the end (= no more bytes) has been received
|
||||
async fn handle_bytes(
|
||||
bytes: Option<Result<bytes::Bytes, reqwest::Error>>,
|
||||
state: Arc<AppState>,
|
||||
) -> bool {
|
||||
// TODO: switch checks, such that `.transpose()` is not needed anymore
|
||||
let bytes = match bytes.transpose() {
|
||||
Ok(bytes) => bytes,
|
||||
Err(e) => {
|
||||
println!("Error fetching chunk from stream: {e:?}");
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(bytes) = bytes {
|
||||
state.add_chunk(bytes).await;
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user