initial prototype

This commit is contained in:
2025-07-19 15:35:04 +02:00
commit 673a0967b2
8 changed files with 1948 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/target

1741
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

13
Cargo.toml Normal file
View File

@@ -0,0 +1,13 @@
[package]
name = "player"
version = "0.1.0"
edition = "2024"
[dependencies]
axum = "0.8"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
reqwest = { version = "0.12", features = ["stream", "json"] }
bytes = "1"
async-stream = "0.3"
serde_json = "1.0"

37
src/downloader.rs Normal file
View File

@@ -0,0 +1,37 @@
use crate::state::AppState;
use std::sync::Arc;
use tokio_stream::StreamExt;
pub fn spawn_download_task(url: &str, state: Arc<AppState>) {
let url = url.to_string();
tokio::spawn(async move {
if let Err(e) = download_stream(&url, state).await {
eprintln!("Download failed: {}", e);
}
});
}
async fn download_stream(url: &str, state: Arc<AppState>) -> Result<(), reqwest::Error> {
println!("Starting download from: {}", url);
let response = reqwest::Client::new().get(url).send().await?;
let mut stream = response.bytes_stream();
let mut total_size = 0;
while let Some(chunk) = stream.next().await {
let bytes = chunk?;
total_size += bytes.len();
state.add_chunk(bytes).await;
if total_size % (1024 * 1024) == 0 {
// Log every MB
println!("Downloaded: {} MB", total_size / (1024 * 1024));
}
}
state.mark_complete().await;
println!("Download complete! Total: {} bytes", total_size);
Ok(())
}

45
src/lib.rs Normal file
View File

@@ -0,0 +1,45 @@
use serde_json::Value;
pub async fn newest_morning_journal_streaming_url() -> Result<String, Box<dyn std::error::Error>> {
let url = get_newest_morning_journal().await?;
get_streaming_url(url).await
}
// List of broadcasts: https://audioapi.orf.at/oe1/api/json/current/broadcasts
//
// ^ contains link, e.g. https://audioapi.orf.at/oe1/api/json/4.0/broadcast/797577/20250611
async fn get_newest_morning_journal() -> Result<String, Box<dyn std::error::Error>> {
let url = "https://audioapi.orf.at/oe1/api/json/current/broadcasts";
let data: Value = reqwest::get(url).await?.json().await?;
if let Some(days) = data.as_array() {
for day in days.iter().rev() {
if let Some(broadcasts) = day["broadcasts"].as_array() {
for broadcast in broadcasts.iter().rev() {
if broadcast["title"] == "Ö1 Morgenjournal" {
if let Some(href) = broadcast["href"].as_str() {
return Ok(href.into());
}
}
}
}
}
}
Err(String::from("No Ö1 Morgenjournal found").into())
}
async fn get_streaming_url(url: String) -> Result<String, Box<dyn std::error::Error>> {
let data: Value = reqwest::get(url).await?.json().await?;
let Some(streams) = data["streams"].as_array() else {
return Err(String::from("No 'streams' found").into());
};
assert_eq!(streams.len(), 1);
let Some(id) = streams[0]["loopStreamId"].as_str() else {
return Err(String::from("No 'loopStreamId' found").into());
};
Ok(format!(
"https://loopstream01.apa.at/?channel=oe1&shoutcast=0&id={id}"
))
}

34
src/main.rs Normal file
View File

@@ -0,0 +1,34 @@
mod downloader;
mod state;
mod streamer;
use axum::{extract::State, routing::get, Router};
use state::AppState;
use std::sync::Arc;
async fn new(State(state): State<Arc<AppState>>) -> &'static str {
let Ok(url) = player::newest_morning_journal_streaming_url().await else {
return "Failed getting latest url";
};
downloader::spawn_download_task(&url, state.clone());
"Download started. Access / to stream the new content."
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let state = Arc::new(AppState::new());
let app = Router::new()
.route("/", get(streamer::stream_handler))
.route("/new", get(new))
.with_state(state);
println!("Streaming server running on http://localhost:3000");
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
axum::serve(listener, app).await?;
Ok(())
}

36
src/state.rs Normal file
View File

@@ -0,0 +1,36 @@
use bytes::Bytes;
use tokio::sync::{Notify, RwLock};
pub struct AppState {
pub chunks: RwLock<Vec<Bytes>>,
pub complete: RwLock<bool>,
pub notify: Notify,
}
impl AppState {
pub fn new() -> Self {
Self {
chunks: RwLock::new(Vec::new()),
complete: RwLock::new(false),
notify: Notify::new(),
}
}
pub async fn add_chunk(&self, chunk: Bytes) {
self.chunks.write().await.push(chunk);
self.notify.notify_waiters();
}
pub async fn mark_complete(&self) {
*self.complete.write().await = true;
self.notify.notify_waiters();
}
pub async fn is_complete(&self) -> bool {
*self.complete.read().await
}
pub async fn chunk_count(&self) -> usize {
self.chunks.read().await.len()
}
}

41
src/streamer.rs Normal file
View File

@@ -0,0 +1,41 @@
use crate::state::AppState;
use axum::{body::Body, extract::State, response::Response};
use std::sync::Arc;
use tokio_stream::Stream;
pub async fn stream_handler(State(state): State<Arc<AppState>>) -> Response {
let stream = create_chunk_stream(state);
let body = Body::from_stream(stream);
Response::builder()
.header("Content-Type", "audio/mpeg")
.header("Cache-Control", "no-cache")
.body(body)
.unwrap()
}
fn create_chunk_stream(
state: Arc<AppState>,
) -> impl Stream<Item = Result<bytes::Bytes, std::io::Error>> {
async_stream::stream! {
let mut position = 0;
loop {
// Send available chunks
let chunks = state.chunks.read().await;
while position < chunks.len() {
yield Ok(chunks[position].clone());
position += 1;
}
drop(chunks);
// Exit if download complete and all chunks sent
if state.is_complete().await && position >= state.chunk_count().await {
break;
}
// Wait for new chunks
state.notify.notified().await;
}
}
}