From a7ab1e83602487de3e7dadb1871c64ef95f38d0a Mon Sep 17 00:00:00 2001 From: Philipp Hofer Date: Mon, 28 Jul 2025 21:29:58 +0200 Subject: [PATCH] retry failed chunk downloads; Continues #1 --- Cargo.lock | 13 +++++++++++++ Cargo.toml | 1 + src/downloader.rs | 43 ++++++++++++++++++++++++++++++++----------- 3 files changed, 46 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4a792b7..1d0cf7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -664,6 +664,7 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", + "tracing", ] [[package]] @@ -1220,9 +1221,21 @@ checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.34" diff --git a/Cargo.toml b/Cargo.toml index 9f77853..ae64fd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,3 +11,4 @@ reqwest = { version = "0.12", features = ["stream", "json", "rustls-tls"], defau bytes = "1" async-stream = "0.3" serde_json = "1" +tracing = "0.1" diff --git a/src/downloader.rs b/src/downloader.rs index 715e602..6522505 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -1,5 +1,6 @@ use crate::state::AppState; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; +use tokio::time::timeout; use tokio_stream::StreamExt; pub async fn spawn_download_task(url: &str, state: Arc) { @@ -18,22 +19,42 @@ async fn download_stream(url: &str, state: Arc) -> Result<(), reqwest: let response = reqwest::Client::new().get(url).send().await?; let mut stream = response.bytes_stream(); - let mut total_size = 0; - while let Some(chunk) = stream.next().await { - let bytes = chunk?; - total_size += bytes.len(); + loop { + let Ok(bytes) = timeout(Duration::from_secs(5), stream.next()).await else { + println!("Error fetching stream, trying again..."); + continue; + }; - state.add_chunk(bytes).await; - - if total_size % (1024 * 1024) == 0 { - // Log every MB - println!("Downloaded: {} MB", total_size / (1024 * 1024)); + if handle_bytes(bytes, state.clone()).await { + break; } } state.mark_complete().await; - println!("Download complete! Total: {total_size} bytes"); + println!("Download complete!"); Ok(()) } + +/// Returns if the end (= no more bytes) has been received +async fn handle_bytes( + bytes: Option>, + state: Arc, +) -> bool { + // TODO: switch checks, such that `.transpose()` is not needed anymore + let bytes = match bytes.transpose() { + Ok(bytes) => bytes, + Err(e) => { + println!("Error fetching chunk from stream: {e:?}"); + return false; + } + }; + + if let Some(bytes) = bytes { + state.add_chunk(bytes).await; + false + } else { + true + } +}