feat: enhance asset import functionality and unit tracking

- Added WHISPER_URL environment variable to docker-compose for audio transcription service.
- Updated Nginx configuration to increase timeout settings for API requests.
- Enhanced asset ingestion process to extract unit numbers from ZIP entry paths, supporting various naming conventions.
- Implemented logic to split intensive courses into two regular courses during asset import.
- Added new fields to the Asset and QuestionBank models to track unit numbers and source asset links.
- Introduced backward-compatible fallbacks for fetching study plans and courses from legacy MySQL database.
- Improved error handling and progress tracking during ZIP file uploads in the frontend.
- Created a new SQL migration to add unit_number and source_asset_id columns to the assets and question_bank tables, along with necessary indexes for performance.
This commit is contained in:
2026-04-07 13:38:22 -04:00
parent 7f9b9d69ae
commit 024bd6e46d
11 changed files with 687 additions and 101 deletions
+256 -81
View File
@@ -16,6 +16,7 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use sqlx::PgPool;
use uuid::Uuid;
use std::collections::HashMap;
use std::env;
use std::path::Path as StdPath;
use tokio::process::Command;
@@ -585,6 +586,9 @@ pub async fn ingest_asset_for_rag(
&client,
&ollama_url,
&model,
None,
None,
asset.unit_number,
)
.await?;
@@ -601,6 +605,38 @@ pub async fn ingest_asset_for_rag(
/// - file: ZIP requerido
/// - course_id: UUID opcional
/// - ingest_rag: true/false opcional (default false)
/// Extracts a unit number from a ZIP entry path using the top-level folder name.
/// Supports: "Unit 1/...", "Unidad 1/...", "unit-01/...", "01/...", "1/..."
fn extract_unit_number(entry_name: &str) -> Option<i32> {
let parts: Vec<&str> = entry_name.splitn(2, '/').collect();
if parts.len() < 2 {
return None; // file at ZIP root — no unit folder
}
let folder = parts[0].trim();
if folder.is_empty() {
return None;
}
let lower = folder.to_lowercase();
// Strip common textual prefixes, then parse leading digits
let stripped = lower
.trim_start_matches("unidad")
.trim_start_matches("unit")
.trim_start_matches('u')
.trim_start_matches(|c: char| !c.is_ascii_digit());
let digits: String = stripped.chars().take_while(|c| c.is_ascii_digit()).collect();
digits.parse().ok()
}
struct ZipEntryData {
entry_name: String,
safe_filename: String,
content: Vec<u8>,
unit_number: Option<i32>,
guessed_mimetype: String,
is_audio_video: bool,
is_flv: bool,
}
pub async fn import_assets_zip(
Org(org_ctx): Org,
claims: Claims,
@@ -613,6 +649,9 @@ pub async fn import_assets_zip(
let mut sam_plan_id: Option<i32> = None;
let mut sam_course_id: Option<i32> = None;
let mut ingest_rag = false;
let mut split_to_regular = false;
let mut sam_course_id_r1: Option<i32> = None;
let mut sam_course_id_r2: Option<i32> = None;
while let Some(mut field) = multipart
.next_field()
@@ -678,6 +717,23 @@ pub async fn import_assets_zip(
sam_course_id = Some(id);
}
}
} else if name == "split_to_regular" {
if let Ok(txt) = field.text().await {
let v = txt.trim().to_lowercase();
split_to_regular = v == "1" || v == "true" || v == "yes";
}
} else if name == "sam_course_id_r1" {
if let Ok(txt) = field.text().await {
if let Ok(id) = txt.trim().parse::<i32>() {
sam_course_id_r1 = Some(id);
}
}
} else if name == "sam_course_id_r2" {
if let Ok(txt) = field.text().await {
if let Ok(id) = txt.trim().parse::<i32>() {
sam_course_id_r2 = Some(id);
}
}
}
}
@@ -699,11 +755,84 @@ pub async fn import_assets_zip(
return Err((StatusCode::BAD_REQUEST, "No ZIP file uploaded".to_string()));
}
// ── Phase 1: collect all ZIP entries into memory ──────────────────────────
let mut all_entries: Vec<ZipEntryData> = Vec::new();
let mut unit_set: std::collections::BTreeSet<i32> = Default::default();
let len = archive.len();
for i in 0..len {
let mut file = archive
.by_index(i)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("ZIP read error: {}", e)))?;
if !file.is_file() {
continue;
}
let entry_name = file.name().to_string();
if entry_name.starts_with("__MACOSX/") || entry_name.ends_with(".DS_Store") {
continue;
}
let safe_filename = StdPath::new(&entry_name)
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("unnamed")
.to_string();
let mut content = Vec::new();
std::io::Read::read_to_end(&mut file, &mut content)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("ZIP entry read failed: {}", e)))?;
let guessed_mimetype = mime_guess::from_path(&safe_filename)
.first_or_octet_stream()
.to_string();
let is_flv = is_flv_media(&safe_filename, &guessed_mimetype);
let is_audio_video = is_flv
|| guessed_mimetype.starts_with("audio/")
|| guessed_mimetype.starts_with("video/");
let unit_number = extract_unit_number(&entry_name);
if let Some(u) = unit_number {
unit_set.insert(u);
}
all_entries.push(ZipEntryData {
entry_name,
safe_filename,
content,
unit_number,
guessed_mimetype,
is_audio_video,
is_flv,
});
}
// ── Phase 1b: calculate split midpoint (intensive → 2 regular courses) ───
// For 8-10 units: first half → regular 1, second half → regular 2.
// Mid is the last unit number that goes to regular 1 (ceiling of N/2).
let split_midpoint: Option<i32> = if split_to_regular
&& sam_course_id_r1.is_some()
&& sam_course_id_r2.is_some()
&& !unit_set.is_empty()
{
let units: Vec<i32> = unit_set.iter().cloned().collect();
let mid_idx = (units.len() + 1) / 2; // ceiling: 8 → 4, 9 → 5, 10 → 5
Some(units[mid_idx - 1])
} else {
None
};
// Sort: audio/video first so their asset IDs are known when text is ingested
all_entries.sort_by_key(|e| if e.is_audio_video { 0usize } else { 1 });
// ── Phase 2: process entries ───────────────────────────────────────────────
let mut imported_assets = 0usize;
let mut rag_ingested_assets = 0usize;
let mut rag_chunks_ingested = 0usize;
let mut failed_entries: Vec<String> = Vec::new();
// unit_number → (asset_id, public_url): populated from audio/video assets
let mut unit_audio_map: HashMap<i32, (Uuid, String)> = HashMap::new();
let rag_client = if ingest_rag {
Some(
reqwest::Client::builder()
@@ -718,41 +847,28 @@ pub async fn import_assets_zip(
let ollama_url = ai::get_ollama_url();
let model = ai::get_embedding_model();
let len = archive.len();
for i in 0..len {
let (entry_name, safe_filename, content): (String, String, Vec<u8>) = {
let mut file = archive
.by_index(i)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("ZIP read error: {}", e)))?;
for entry in all_entries {
let ZipEntryData {
entry_name,
safe_filename,
content,
unit_number,
guessed_mimetype,
is_audio_video,
is_flv,
} = entry;
if !file.is_file() {
continue;
// Determine effective sam_course_id based on split midpoint
let effective_sam_course_id = match (split_midpoint, unit_number) {
(Some(mid), Some(u)) => {
if u <= mid { sam_course_id_r1 } else { sam_course_id_r2 }
}
let entry_name = file.name().to_string();
if entry_name.starts_with("__MACOSX/") || entry_name.ends_with(".DS_Store") {
continue;
}
let safe_filename = std::path::Path::new(&entry_name)
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("unnamed")
.to_string();
let mut content = Vec::new();
std::io::Read::read_to_end(&mut file, &mut content)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("ZIP entry read failed: {}", e)))?;
(entry_name, safe_filename, content)
_ => sam_course_id,
};
let asset_id = Uuid::new_v4();
let guessed_mimetype = mime_guess::from_path(&safe_filename)
.first_or_octet_stream()
.to_string();
let (storage_path, stored_filename, mimetype) = if is_flv_media(&safe_filename, &guessed_mimetype) {
let (storage_path, stored_filename, mimetype) = if is_flv {
let temp_storage_filename = format!("{}.flv", asset_id);
let temp_storage_path = format!("uploads/{}", temp_storage_filename);
tokio::fs::create_dir_all("uploads")
@@ -801,7 +917,7 @@ pub async fn import_assets_zip(
.unwrap_or("")
.to_string();
let (db_storage_path, _asset_url) = if !storage_filename_for_s3.is_empty() {
let (db_storage_path, asset_public_url) = if !storage_filename_for_s3.is_empty() {
if let Some((s3_path, public_url)) = maybe_push_local_file_to_s3(
&storage_path,
&storage_filename_for_s3,
@@ -834,8 +950,8 @@ pub async fn import_assets_zip(
let insert_result = sqlx::query(
r#"
INSERT INTO assets (id, organization_id, uploaded_by, course_id, english_level, sam_plan_id, sam_course_id, filename, storage_path, mimetype, size_bytes)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
INSERT INTO assets (id, organization_id, uploaded_by, course_id, english_level, sam_plan_id, sam_course_id, unit_number, filename, storage_path, mimetype, size_bytes)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
"#,
)
.bind(asset_id)
@@ -844,7 +960,8 @@ pub async fn import_assets_zip(
.bind(course_id)
.bind(&english_level)
.bind(sam_plan_id)
.bind(sam_course_id)
.bind(effective_sam_course_id)
.bind(unit_number)
.bind(&stored_filename)
.bind(&db_storage_path)
.bind(&mimetype)
@@ -859,6 +976,13 @@ pub async fn import_assets_zip(
imported_assets += 1;
// Track audio/video asset per unit for cross-linking with text RAG chunks
if is_audio_video {
if let Some(u) = unit_number {
unit_audio_map.entry(u).or_insert((asset_id, asset_public_url.clone()));
}
}
if ingest_rag {
let asset = Asset {
id: asset_id,
@@ -867,7 +991,8 @@ pub async fn import_assets_zip(
course_id,
english_level: english_level.clone(),
sam_plan_id,
sam_course_id,
sam_course_id: effective_sam_course_id,
unit_number,
filename: stored_filename.clone(),
storage_path: db_storage_path.clone(),
mimetype: mimetype.clone(),
@@ -875,6 +1000,16 @@ pub async fn import_assets_zip(
created_at: chrono::Utc::now(),
};
// For text/PDF entries, look up the audio asset from the same unit
let (linked_audio_id, linked_audio_url) = if !is_audio_video {
match unit_number.and_then(|u| unit_audio_map.get(&u)) {
Some((aid, aurl)) => (Some(*aid), Some(aurl.clone())),
None => (None, None),
}
} else {
(None, None)
};
match extract_asset_text(&asset).await {
Ok(extracted) => {
let trimmed = extracted.trim();
@@ -889,7 +1024,7 @@ pub async fn import_assets_zip(
continue;
}
let source_kind = if mimetype.starts_with("audio/") || mimetype.starts_with("video/") {
let source_kind = if is_audio_video {
"audio-transcription"
} else if mimetype.contains("pdf") {
"pdf"
@@ -897,7 +1032,7 @@ pub async fn import_assets_zip(
"text"
};
let skill = if mimetype.starts_with("audio/") || mimetype.starts_with("video/") {
let skill = if is_audio_video {
Some("listening")
} else {
Some("reading")
@@ -915,6 +1050,9 @@ pub async fn import_assets_zip(
client,
&ollama_url,
&model,
linked_audio_id,
linked_audio_url,
unit_number,
)
.await
{
@@ -1007,6 +1145,9 @@ async fn ingest_chunks_to_question_bank(
client: &reqwest::Client,
ollama_url: &str,
model: &str,
source_asset_id: Option<Uuid>,
audio_url: Option<String>,
unit_number: Option<i32>,
) -> Result<(), (StatusCode, String)> {
for (idx, chunk) in chunks.iter().enumerate() {
let metadata = json!({
@@ -1017,6 +1158,7 @@ async fn ingest_chunks_to_question_bank(
"source_kind": source_kind,
"chunk_index": idx + 1,
"chunk_total": chunks.len(),
"unit_number": unit_number,
});
let inserted_id: Uuid = sqlx::query_scalar(
@@ -1031,10 +1173,13 @@ async fn ingest_chunks_to_question_bank(
skill_assessed,
source,
source_metadata,
source_asset_id,
audio_url,
unit_number,
is_active,
is_archived
)
VALUES ($1, $2, $3, 'short-answer', $4, 'medium', $5, 'imported-material', $6, true, false)
VALUES ($1, $2, $3, 'short-answer', $4, 'medium', $5, 'imported-material', $6, $7, $8, $9, true, false)
RETURNING id
"#,
)
@@ -1044,6 +1189,9 @@ async fn ingest_chunks_to_question_bank(
.bind("RAG material chunk from uploaded asset")
.bind(skill)
.bind(&metadata)
.bind(source_asset_id)
.bind(&audio_url)
.bind(unit_number)
.fetch_one(pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Insert failed: {}", e)))?;
@@ -1139,54 +1287,81 @@ async fn extract_pdf_text_from_bytes(bytes: Vec<u8>) -> Result<String, (StatusCo
}
async fn transcribe_media_bytes(file_data: Vec<u8>, filename: &str) -> Result<String, (StatusCode, String)> {
let whisper_url = std::env::var("WHISPER_URL")
.unwrap_or_else(|_| "http://localhost:8000".to_string());
let client = reqwest::Client::new();
let form = reqwest::multipart::Form::new()
.part(
"file",
reqwest::multipart::Part::bytes(file_data).file_name(filename.to_string()),
)
.text("model", "whisper-1")
.text("response_format", "json");
let response = client
.post(format!("{}/v1/audio/transcriptions", whisper_url))
.multipart(form)
.send()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Whisper request failed: {}", e)))?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err((
StatusCode::BAD_GATEWAY,
format!("Whisper API error {}: {}", status, body),
));
let mut whisper_urls: Vec<String> = Vec::new();
if let Ok(url) = std::env::var("WHISPER_URL") {
let trimmed = url.trim();
if !trimmed.is_empty() {
whisper_urls.push(trimmed.trim_end_matches('/').to_string());
}
}
let transcription: serde_json::Value = response
.json()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Invalid Whisper response: {}", e)))?;
let text = transcription
.get("text")
.and_then(|v| v.as_str())
.unwrap_or("")
.trim()
.to_string();
if text.is_empty() {
return Err((
StatusCode::BAD_REQUEST,
"Whisper no pudo extraer texto del audio/video".to_string(),
));
// Container-friendly fallbacks for common local deployments.
if whisper_urls.is_empty() {
whisper_urls.push("http://host.docker.internal:8000".to_string());
whisper_urls.push("http://localhost:8000".to_string());
}
Ok(text)
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(300))
.build()
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Whisper HTTP client error: {}", e)))?;
let mut last_error = String::new();
for base_url in whisper_urls {
let form = reqwest::multipart::Form::new()
.part(
"file",
reqwest::multipart::Part::bytes(file_data.clone()).file_name(filename.to_string()),
)
.text("model", "whisper-1")
.text("response_format", "json");
let endpoint = format!("{}/v1/audio/transcriptions", base_url);
let response = match client.post(&endpoint).multipart(form).send().await {
Ok(r) => r,
Err(e) => {
last_error = format!("{} ({})", endpoint, e);
continue;
}
};
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
last_error = format!("{} -> {}: {}", endpoint, status, body);
continue;
}
let transcription: serde_json::Value = response
.json()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Invalid Whisper response: {}", e)))?;
let text = transcription
.get("text")
.and_then(|v| v.as_str())
.unwrap_or("")
.trim()
.to_string();
if text.is_empty() {
return Err((
StatusCode::BAD_REQUEST,
"Whisper no pudo extraer texto del audio/video".to_string(),
));
}
return Ok(text);
}
Err((
StatusCode::BAD_GATEWAY,
format!(
"Whisper request failed en todos los endpoints configurados. Ultimo error: {}",
last_error
),
))
}
fn chunk_text(text: &str, max_chars: usize) -> Vec<String> {