diff --git a/.env.dev.example b/.env.dev.example index 99dc107..024e75c 100644 --- a/.env.dev.example +++ b/.env.dev.example @@ -54,6 +54,8 @@ SAM_DATABASE_URL=mysql://user:password@host:3306/sige_sam_v3 LOCAL_VIDEO_BRIDGE_URL=http://localhost:8000 EMBEDDING_MODEL=nomic-embed-text LOCAL_LLM_MODEL=llama3.2:3b +# Workers para procesamiento RAG asíncrono post-ZIP ("túneles"): 1..12 +ZIP_RAG_CONCURRENCY=5 # Backend-to-backend (LMS -> CMS) CMS_API_URL=http://studio:3001 diff --git a/nginx/studio.conf b/nginx/studio.conf index f08734c..4d386ba 100644 --- a/nginx/studio.conf +++ b/nginx/studio.conf @@ -3,11 +3,30 @@ # Allow large ZIP uploads (RAG bulk import can exceed 2GB). client_max_body_size 4096m; -client_body_timeout 1800s; +client_body_timeout 43200s; # API routes that need to go to port 3001 # Prefer the explicit `/cms-api/*` prefix for frontend fetches. This avoids collisions # with Next.js pages like `/courses` and `/admin` that share the same host. +location = /cms-api/api/assets/import-zip { + # Upload/import of large ZIPs can run for several minutes. + # Keep this route unbuffered and with very high upstream timeouts. + rewrite ^/cms-api/(.*)$ /$1 break; + proxy_pass http://openccb-studio:3001; + proxy_request_buffering off; + proxy_buffering off; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $http_x_forwarded_proto; + proxy_set_header Connection ""; + proxy_http_version 1.1; + proxy_connect_timeout 7200s; + proxy_send_timeout 43200s; + proxy_read_timeout 43200s; + send_timeout 43200s; +} + location /cms-api/ { # CORS safety net at proxy level for CMS API. set $cors_origin ""; @@ -42,9 +61,9 @@ location /cms-api/ { proxy_set_header Connection ""; proxy_http_version 1.1; proxy_connect_timeout 300s; - proxy_send_timeout 7200s; - proxy_read_timeout 7200s; - send_timeout 7200s; + proxy_send_timeout 43200s; + proxy_read_timeout 43200s; + send_timeout 43200s; } location /lms-api/ { diff --git a/roadmap.md b/roadmap.md index fe26dac..3c72b0b 100644 --- a/roadmap.md +++ b/roadmap.md @@ -121,7 +121,4 @@ 1. Finalización de **Certificados y Progreso Real**. 2. Despliegue de **Infraestructura SMTP** para comunicación global. 3. Auditoría de **Accesibilidad Universal (WCAG)**. -4. Implementación de **IA de Moderación (Seguridad)**. - - -asdf \ No newline at end of file +4. Implementación de **IA de Moderación (Seguridad)**. \ No newline at end of file diff --git a/services/cms-service/migrations/20260417000001_background_tasks_zip_rag.sql b/services/cms-service/migrations/20260417000001_background_tasks_zip_rag.sql new file mode 100644 index 0000000..a5d075c --- /dev/null +++ b/services/cms-service/migrations/20260417000001_background_tasks_zip_rag.sql @@ -0,0 +1,23 @@ +CREATE TABLE IF NOT EXISTS background_tasks ( + id UUID PRIMARY KEY, + organization_id UUID NOT NULL, + created_by UUID NOT NULL, + title TEXT NOT NULL, + course_title TEXT, + task_type VARCHAR(64) NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'queued', + progress INTEGER NOT NULL DEFAULT 0, + total_items INTEGER NOT NULL DEFAULT 0, + processed_items INTEGER NOT NULL DEFAULT 0, + failed_items INTEGER NOT NULL DEFAULT 0, + error_message TEXT, + metadata JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS idx_background_tasks_org_status_updated + ON background_tasks (organization_id, status, updated_at DESC); + +CREATE INDEX IF NOT EXISTS idx_background_tasks_type_status + ON background_tasks (task_type, status); diff --git a/services/cms-service/src/handlers/tasks.rs b/services/cms-service/src/handlers/tasks.rs index 44f2d30..ef659c4 100644 --- a/services/cms-service/src/handlers/tasks.rs +++ b/services/cms-service/src/handlers/tasks.rs @@ -23,20 +23,37 @@ pub async fn get_background_tasks( State(pool): State, ) -> Result>, (StatusCode, String)> { let query = r#" - SELECT - l.id, - l.title, - c.title as course_title, - 'lesson_transcription' as task_type, - l.transcription_status as status, - 0 as progress, - l.updated_at - FROM lessons l - JOIN modules m ON l.module_id = m.id - JOIN courses c ON m.course_id = c.id - WHERE l.transcription_status IN ('queued', 'processing', 'failed') - + SELECT id, title, course_title, task_type, status, progress, updated_at + FROM ( + SELECT + l.id, + l.title, + c.title as course_title, + 'lesson_transcription' as task_type, + l.transcription_status as status, + 0 as progress, + l.updated_at + FROM lessons l + JOIN modules m ON l.module_id = m.id + JOIN courses c ON m.course_id = c.id + WHERE l.transcription_status IN ('queued', 'processing', 'failed') + + UNION ALL + + SELECT + t.id, + t.title, + t.course_title, + t.task_type, + t.status, + t.progress, + t.updated_at + FROM background_tasks t + WHERE t.task_type = 'zip_rag_import' + AND t.status IN ('queued', 'processing', 'failed', 'completed') + ) merged ORDER BY updated_at DESC + LIMIT 200 "#; let tasks = sqlx::query_as::<_, BackgroundTask>(query) diff --git a/services/cms-service/src/handlers_assets.rs b/services/cms-service/src/handlers_assets.rs index 84e5411..2a50354 100644 --- a/services/cms-service/src/handlers_assets.rs +++ b/services/cms-service/src/handlers_assets.rs @@ -20,6 +20,7 @@ use uuid::Uuid; use std::collections::HashMap; use std::env; use std::path::Path as StdPath; +use std::sync::Arc; use tokio::process::Command; use tokio::io::AsyncWriteExt; use tokio::task::JoinSet; @@ -47,6 +48,8 @@ pub struct AssetZipImportResponse { pub rag_ingested_assets: usize, pub rag_chunks_ingested: usize, pub failed_entries: Vec, + pub rag_background_started: bool, + pub rag_background_items: usize, } #[derive(Debug, Deserialize)] @@ -133,6 +136,18 @@ fn build_s3_object_key(org_id: Uuid, course_id: Option, storage_filename: } } +fn build_ready_for_rag_path(org_id: Uuid, asset_id: Uuid, filename: &str) -> String { + let ext = StdPath::new(filename) + .extension() + .and_then(|s| s.to_str()) + .unwrap_or(""); + if ext.is_empty() { + format!("uploads/ready-for-rag/{}/{}", org_id, asset_id) + } else { + format!("uploads/ready-for-rag/{}/{}.{}", org_id, asset_id, ext) + } +} + fn build_s3_public_url(settings: &S3Settings, key: &str) -> String { if let Some(base) = &settings.public_base_url { return format!("{}/{}", base.trim_end_matches('/'), key); @@ -700,6 +715,109 @@ struct ZipEntryData { is_flv: bool, } +struct PendingZipRagItem { + entry_name: String, + asset: Asset, + is_audio_video: bool, + unit_number: Option, +} + +async fn create_zip_rag_background_task( + pool: &PgPool, + org_id: Uuid, + user_id: Uuid, + course_id: Option, + total_items: usize, +) -> Result { + let task_id = Uuid::new_v4(); + let course_title = if let Some(cid) = course_id { + sqlx::query_scalar::<_, String>("SELECT title FROM courses WHERE id = $1") + .bind(cid) + .fetch_optional(pool) + .await? + } else { + None + }; + + sqlx::query( + r#" + INSERT INTO background_tasks ( + id, + organization_id, + created_by, + title, + course_title, + task_type, + status, + progress, + total_items, + processed_items, + failed_items, + metadata, + created_at, + updated_at + ) + VALUES ( + $1, $2, $3, + 'ZIP import RAG processing', + $4, + 'zip_rag_import', + 'queued', + 0, + $5, + 0, + 0, + '{}'::jsonb, + NOW(), + NOW() + ) + "#, + ) + .bind(task_id) + .bind(org_id) + .bind(user_id) + .bind(course_title) + .bind(total_items as i32) + .execute(pool) + .await?; + + Ok(task_id) +} + +async fn set_zip_rag_task_status( + pool: &PgPool, + task_id: Uuid, + status: &str, + progress: i32, + processed_items: usize, + failed_items: usize, + error_message: Option<&str>, +) -> Result<(), sqlx::Error> { + sqlx::query( + r#" + UPDATE background_tasks + SET status = $2, + progress = $3, + processed_items = $4, + failed_items = $5, + error_message = $6, + updated_at = NOW() + WHERE id = $1 + AND task_type = 'zip_rag_import' + "#, + ) + .bind(task_id) + .bind(status) + .bind(progress) + .bind(processed_items as i32) + .bind(failed_items as i32) + .bind(error_message) + .execute(pool) + .await?; + + Ok(()) +} + async fn process_zip_entry_without_rag( entry: ZipEntryData, org_id: Uuid, @@ -714,6 +832,8 @@ async fn process_zip_entry_without_rag( sam_course_id_r2: Option, s3_settings: Option, s3_client: Option, + use_dev_processing: bool, + ingest_rag: bool, ) -> Result<(), String> { let ZipEntryData { entry_name, @@ -735,39 +855,76 @@ async fn process_zip_entry_without_rag( let asset_id = Uuid::new_v4(); let (storage_path, stored_filename, mimetype) = if is_flv { - let temp_storage_filename = format!("{}.flv", asset_id); - let temp_storage_path = format!("uploads/{}", temp_storage_filename); - tokio::fs::write(&temp_storage_path, &content) - .await - .map_err(|e| format!("{}: Error en la escritura local ({})", entry_name, e))?; - - let final_storage_filename = format!("{}.mp4", asset_id); - let final_storage_path = format!("uploads/{}", final_storage_filename); - if let Err((_, msg)) = transcode_flv_to_mp4(&temp_storage_path, &final_storage_path).await { - let _ = tokio::fs::remove_file(&temp_storage_path).await; - return Err(format!("{}: la transcodificación de flv falló ({})", entry_name, msg)); - } - let _ = tokio::fs::remove_file(&temp_storage_path).await; - - ( - final_storage_path, - replace_extension(&safe_filename, "mp4"), - "video/mp4".to_string(), - ) - } else { - let extension = StdPath::new(&safe_filename) - .extension() - .and_then(|s| s.to_str()) - .unwrap_or(""); - - let storage_filename = if extension.is_empty() { - asset_id.to_string() + if use_dev_processing && ingest_rag { + let storage_path = build_ready_for_rag_path(org_id, asset_id, &format!("{}.flv", asset_id)); + tokio::fs::create_dir_all(StdPath::new(&storage_path).parent().unwrap_or(StdPath::new("."))) + .await + .map_err(|e| format!("{}: Error creating ready-for-rag dir ({})", entry_name, e))?; + tokio::fs::write(&storage_path, &content) + .await + .map_err(|e| format!("{}: Error en la escritura local ({})", entry_name, e))?; + ( + storage_path, + safe_filename.clone(), + if guessed_mimetype.is_empty() { "video/x-flv".to_string() } else { guessed_mimetype.clone() }, + ) + } else if use_dev_processing { + let storage_filename = format!("{}.flv", asset_id); + let storage_path = format!("uploads/{}", storage_filename); + tokio::fs::write(&storage_path, &content) + .await + .map_err(|e| format!("{}: Error en la escritura local ({})", entry_name, e))?; + ( + storage_path, + safe_filename.clone(), + if guessed_mimetype.is_empty() { "video/x-flv".to_string() } else { guessed_mimetype.clone() }, + ) } else { - format!("{}.{}", asset_id, extension) - }; - let storage_path = format!("uploads/{}", storage_filename); + let temp_storage_filename = format!("{}.flv", asset_id); + let temp_storage_path = format!("uploads/{}", temp_storage_filename); + tokio::fs::write(&temp_storage_path, &content) + .await + .map_err(|e| format!("{}: Error en la escritura local ({})", entry_name, e))?; - (storage_path, safe_filename.clone(), guessed_mimetype) + let final_storage_filename = format!("{}.mp4", asset_id); + let final_storage_path = format!("uploads/{}", final_storage_filename); + if let Err((_, msg)) = transcode_flv_to_mp4(&temp_storage_path, &final_storage_path).await { + let _ = tokio::fs::remove_file(&temp_storage_path).await; + return Err(format!("{}: la transcodificación de flv falló ({})", entry_name, msg)); + } + let _ = tokio::fs::remove_file(&temp_storage_path).await; + + ( + final_storage_path, + replace_extension(&safe_filename, "mp4"), + "video/mp4".to_string(), + ) + } + } else { + if use_dev_processing && ingest_rag { + let storage_path = build_ready_for_rag_path(org_id, asset_id, &safe_filename); + tokio::fs::create_dir_all(StdPath::new(&storage_path).parent().unwrap_or(StdPath::new("."))) + .await + .map_err(|e| format!("{}: Error creating ready-for-rag dir ({})", entry_name, e))?; + tokio::fs::write(&storage_path, &content) + .await + .map_err(|e| format!("{}: Error en la escritura local ({})", entry_name, e))?; + (storage_path, safe_filename.clone(), guessed_mimetype) + } else { + let extension = StdPath::new(&safe_filename) + .extension() + .and_then(|s| s.to_str()) + .unwrap_or(""); + + let storage_filename = if extension.is_empty() { + asset_id.to_string() + } else { + format!("{}.{}", asset_id, extension) + }; + let storage_path = format!("uploads/{}", storage_filename); + + (storage_path, safe_filename.clone(), guessed_mimetype) + } }; let storage_filename_for_s3 = StdPath::new(&storage_path) @@ -858,6 +1015,7 @@ pub async fn import_assets_zip( let mut split_to_regular = false; let mut sam_course_id_r1: Option = None; let mut sam_course_id_r2: Option = None; + let mut use_dev_processing = false; while let Some(mut field) = multipart .next_field() @@ -940,6 +1098,11 @@ pub async fn import_assets_zip( sam_course_id_r2 = Some(id); } } + } else if name == "use_dev_processing" { + if let Ok(txt) = field.text().await { + let v = txt.trim().to_lowercase(); + use_dev_processing = v == "1" || v == "true" || v == "yes"; + } } } @@ -1046,22 +1209,28 @@ pub async fn import_assets_zip( let mut rag_ingested_assets = 0usize; let mut rag_chunks_ingested = 0usize; let mut failed_entries: Vec = Vec::new(); + let mut pending_rag_items: Vec = Vec::new(); // unit_number → (asset_id, public_url): populated from audio/video assets let mut unit_audio_map: HashMap = HashMap::new(); - let rag_client = if ingest_rag { - Some( - reqwest::Client::builder() - .danger_accept_invalid_certs(true) - .danger_accept_invalid_hostnames(true) - .build() - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("HTTP client error: {}", e)))?, - ) + let ollama_url = if use_dev_processing { + std::env::var("ZIP_DEV_OLLAMA_URL") + .ok() + .filter(|v| !v.trim().is_empty()) + .or_else(|| std::env::var("DEV_OLLAMA_URL").ok().filter(|v| !v.trim().is_empty())) + .unwrap_or_else(ai::get_ollama_url) + } else { + ai::get_ollama_url() + }; + let whisper_url_override = if use_dev_processing { + std::env::var("ZIP_DEV_WHISPER_URL") + .ok() + .filter(|v| !v.trim().is_empty()) + .or_else(|| std::env::var("DEV_WHISPER_URL").ok().filter(|v| !v.trim().is_empty())) } else { None }; - let ollama_url = ai::get_ollama_url(); let model = ai::get_embedding_model(); if !ingest_rag { @@ -1105,6 +1274,8 @@ pub async fn import_assets_zip( sam_course_id_r2, s3_settings_cl, s3_client_cl, + use_dev_processing, + false, ) .await }); @@ -1125,6 +1296,8 @@ pub async fn import_assets_zip( rag_ingested_assets: 0, rag_chunks_ingested: 0, failed_entries, + rag_background_started: false, + rag_background_items: 0, })); } @@ -1150,36 +1323,64 @@ pub async fn import_assets_zip( let asset_id = Uuid::new_v4(); let (storage_path, stored_filename, mimetype) = if is_flv { - let temp_storage_filename = format!("{}.flv", asset_id); - let temp_storage_path = format!("uploads/{}", temp_storage_filename); - tokio::fs::write(&temp_storage_path, &content) - .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; - - let final_storage_filename = format!("{}.mp4", asset_id); - let final_storage_path = format!("uploads/{}", final_storage_filename); - if let Err((_, msg)) = transcode_flv_to_mp4(&temp_storage_path, &final_storage_path).await { - let _ = tokio::fs::remove_file(&temp_storage_path).await; - failed_entries.push(format!("{}: flv transcode failed ({})", entry_name, msg)); - continue; - } - let _ = tokio::fs::remove_file(&temp_storage_path).await; - - (final_storage_path, replace_extension(&safe_filename, "mp4"), "video/mp4".to_string()) - } else { - let extension = StdPath::new(&safe_filename) - .extension() - .and_then(|s| s.to_str()) - .unwrap_or(""); - - let storage_filename = if extension.is_empty() { - asset_id.to_string() + if use_dev_processing { + let storage_path = build_ready_for_rag_path(org_ctx.id, asset_id, &format!("{}.flv", asset_id)); + tokio::fs::create_dir_all(StdPath::new(&storage_path).parent().unwrap_or(StdPath::new("."))) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error creating ready-for-rag dir: {}", e)))?; + if let Err(e) = tokio::fs::write(&storage_path, &content).await { + failed_entries.push(format!("{}: local write failed ({})", entry_name, e)); + continue; + } + ( + storage_path, + safe_filename.clone(), + if guessed_mimetype.is_empty() { "video/x-flv".to_string() } else { guessed_mimetype.clone() }, + ) } else { - format!("{}.{}", asset_id, extension) - }; - let storage_path = format!("uploads/{}", storage_filename); + let temp_storage_filename = format!("{}.flv", asset_id); + let temp_storage_path = format!("uploads/{}", temp_storage_filename); + tokio::fs::write(&temp_storage_path, &content) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; - (storage_path, safe_filename.clone(), guessed_mimetype) + let final_storage_filename = format!("{}.mp4", asset_id); + let final_storage_path = format!("uploads/{}", final_storage_filename); + if let Err((_, msg)) = transcode_flv_to_mp4(&temp_storage_path, &final_storage_path).await { + let _ = tokio::fs::remove_file(&temp_storage_path).await; + failed_entries.push(format!("{}: flv transcode failed ({})", entry_name, msg)); + continue; + } + let _ = tokio::fs::remove_file(&temp_storage_path).await; + + (final_storage_path, replace_extension(&safe_filename, "mp4"), "video/mp4".to_string()) + } + } else { + if use_dev_processing { + let storage_path = build_ready_for_rag_path(org_ctx.id, asset_id, &safe_filename); + tokio::fs::create_dir_all(StdPath::new(&storage_path).parent().unwrap_or(StdPath::new("."))) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error creating ready-for-rag dir: {}", e)))?; + if let Err(e) = tokio::fs::write(&storage_path, &content).await { + failed_entries.push(format!("{}: local write failed ({})", entry_name, e)); + continue; + } + (storage_path, safe_filename.clone(), guessed_mimetype) + } else { + let extension = StdPath::new(&safe_filename) + .extension() + .and_then(|s| s.to_str()) + .unwrap_or(""); + + let storage_filename = if extension.is_empty() { + asset_id.to_string() + } else { + format!("{}.{}", asset_id, extension) + }; + let storage_path = format!("uploads/{}", storage_filename); + + (storage_path, safe_filename.clone(), guessed_mimetype) + } }; let storage_filename_for_s3 = StdPath::new(&storage_path) @@ -1297,77 +1498,319 @@ pub async fn import_assets_zip( created_at: chrono::Utc::now(), }; - // For text/PDF entries, look up the audio asset from the same unit - let (linked_audio_id, linked_audio_url) = if !is_audio_video { - match unit_number.and_then(|u| unit_audio_map.get(&u)) { - Some((aid, aurl)) => (Some(*aid), Some(aurl.clone())), - None => (None, None), + pending_rag_items.push(PendingZipRagItem { + entry_name, + asset, + is_audio_video, + unit_number, + }); + } + } + + let mut rag_background_started = false; + let mut rag_background_items = 0usize; + + if ingest_rag && !pending_rag_items.is_empty() { + let pool_bg = pool.clone(); + let org_id_bg = org_ctx.id; + let user_id_bg = claims.sub; + let ollama_url_bg = ollama_url.clone(); + let whisper_url_bg = whisper_url_override.clone(); + let model_bg = model.clone(); + let unit_audio_map_bg = unit_audio_map.clone(); + let queued_count = pending_rag_items.len(); + let task_id = match create_zip_rag_background_task( + &pool, + org_ctx.id, + claims.sub, + course_id, + queued_count, + ) + .await + { + Ok(id) => Some(id), + Err(e) => { + tracing::warn!("ZIP async RAG: no se pudo crear background task ({})", e); + None + } + }; + let rag_concurrency = env::var("ZIP_RAG_CONCURRENCY") + .ok() + .and_then(|v| v.parse::().ok()) + .map(|v| v.clamp(1, 12)) + .unwrap_or(5); + rag_background_started = true; + rag_background_items = queued_count; + + tokio::spawn(async move { + if let Some(tid) = task_id { + let _ = set_zip_rag_task_status(&pool_bg, tid, "processing", 0, 0, 0, None).await; + } + + let client = match reqwest::Client::builder() + .danger_accept_invalid_certs(true) + .danger_accept_invalid_hostnames(true) + .build() + { + Ok(c) => c, + Err(e) => { + tracing::error!("ZIP async RAG: no se pudo crear cliente HTTP: {}", e); + return; } - } else { - (None, None) }; - match extract_asset_text(&asset).await { - Ok(extracted) => { - let trimmed = extracted.trim(); - if trimmed.len() < 80 { - failed_entries.push(format!("{}: contenido insuficiente para RAG", entry_name)); - continue; - } + let mut ingested_assets = 0usize; + let mut ingested_chunks = 0usize; + let mut processed_items = 0usize; + let mut failed_items = 0usize; - let chunks = chunk_text(trimmed, 900); - if chunks.is_empty() { - failed_entries.push(format!("{}: no se pudieron generar chunks", entry_name)); - continue; - } + let mut pending_rag_items = pending_rag_items; + let mut unit_audio_map_bg = unit_audio_map_bg; - let source_kind = if is_audio_video { + for item in pending_rag_items.iter_mut() { + if !is_flv_media(&item.asset.filename, &item.asset.mimetype) { + continue; + } + + match normalize_flv_asset_for_rag(&pool_bg, &mut item.asset).await { + Ok(()) => { + if item.is_audio_video { + if let Some(u) = item.unit_number { + unit_audio_map_bg.insert( + u, + (item.asset.id, build_public_url_from_storage_path(&item.asset.storage_path)), + ); + } + } + } + Err((_, msg)) => { + tracing::warn!( + "ZIP async RAG: no se pudo normalizar FLV {} ({})", + item.entry_name, + msg + ); + } + } + } + + let unit_audio_map_bg = Arc::new(unit_audio_map_bg); + + let mut join_set: JoinSet<(usize, usize, bool)> = JoinSet::new(); + + for item in pending_rag_items { + while join_set.len() >= rag_concurrency { + match join_set.join_next().await { + Some(Ok((assets_ok, chunks_ok, failed))) => { + ingested_assets += assets_ok; + ingested_chunks += chunks_ok; + processed_items += 1; + if failed { + failed_items += 1; + } + if let Some(tid) = task_id { + let progress = ((processed_items * 100) / queued_count.max(1)) as i32; + let _ = set_zip_rag_task_status( + &pool_bg, + tid, + "processing", + progress, + processed_items, + failed_items, + None, + ) + .await; + } + } + Some(Err(e)) => { + processed_items += 1; + failed_items += 1; + tracing::warn!("ZIP async RAG: worker fallo ({})", e); + if let Some(tid) = task_id { + let progress = ((processed_items * 100) / queued_count.max(1)) as i32; + let _ = set_zip_rag_task_status( + &pool_bg, + tid, + "processing", + progress, + processed_items, + failed_items, + None, + ) + .await; + } + } + None => break, + } + } + + let pool_w = pool_bg.clone(); + let client_w = client.clone(); + let ollama_url_w = ollama_url_bg.clone(); + let whisper_url_w = whisper_url_bg.clone(); + let model_w = model_bg.clone(); + let audio_map_w = unit_audio_map_bg.clone(); + + join_set.spawn(async move { + let source_kind = if item.is_audio_video { "audio-transcription" - } else if mimetype.contains("pdf") { + } else if item.asset.mimetype.contains("pdf") { "pdf" } else { "text" }; - let skill = if is_audio_video { + let skill = if item.is_audio_video { Some("listening") } else { Some("reading") }; - if let Some(client) = &rag_client { - match ingest_chunks_to_question_bank( - &pool, - org_ctx.id, - claims.sub, - &asset, - source_kind, - skill, - &chunks, - client, - &ollama_url, - &model, - linked_audio_id, - linked_audio_url, - unit_number, - ) - .await - { - Ok(()) => { - rag_ingested_assets += 1; - rag_chunks_ingested += chunks.len(); + let (linked_audio_id, linked_audio_url) = if !item.is_audio_video { + match item.unit_number.and_then(|u| audio_map_w.get(&u)) { + Some((aid, aurl)) => (Some(*aid), Some(aurl.clone())), + None => (None, None), + } + } else { + (None, None) + }; + + match extract_asset_text_with_endpoints(&item.asset, whisper_url_w.as_deref()).await { + Ok(extracted) => { + let trimmed = extracted.trim(); + if trimmed.len() < 80 { + tracing::warn!( + "ZIP async RAG: {} contenido insuficiente para RAG", + item.entry_name + ); + return (0, 0, true); } - Err((_, msg)) => { - failed_entries.push(format!("{}: rag ingest failed ({})", entry_name, msg)); + + let chunks = chunk_text(trimmed, 900); + if chunks.is_empty() { + tracing::warn!( + "ZIP async RAG: {} no genero chunks", + item.entry_name + ); + return (0, 0, true); } + + match ingest_chunks_to_question_bank( + &pool_w, + org_id_bg, + user_id_bg, + &item.asset, + source_kind, + skill, + &chunks, + &client_w, + &ollama_url_w, + &model_w, + linked_audio_id, + linked_audio_url, + item.unit_number, + ) + .await + { + Ok(()) => (1, chunks.len(), false), + Err((_, msg)) => { + tracing::warn!( + "ZIP async RAG: {} ingest fallo ({})", + item.entry_name, + msg + ); + (0, 0, true) + } + } + } + Err((_, msg)) => { + tracing::warn!( + "ZIP async RAG: {} extract fallo ({})", + item.entry_name, + msg + ); + (0, 0, true) + } + } + }); + } + + while let Some(result) = join_set.join_next().await { + match result { + Ok((assets_ok, chunks_ok, failed)) => { + ingested_assets += assets_ok; + ingested_chunks += chunks_ok; + processed_items += 1; + if failed { + failed_items += 1; + } + if let Some(tid) = task_id { + let progress = ((processed_items * 100) / queued_count.max(1)) as i32; + let _ = set_zip_rag_task_status( + &pool_bg, + tid, + "processing", + progress, + processed_items, + failed_items, + None, + ) + .await; + } + } + Err(e) => { + processed_items += 1; + failed_items += 1; + tracing::warn!("ZIP async RAG: worker fallo ({})", e); + if let Some(tid) = task_id { + let progress = ((processed_items * 100) / queued_count.max(1)) as i32; + let _ = set_zip_rag_task_status( + &pool_bg, + tid, + "processing", + progress, + processed_items, + failed_items, + None, + ) + .await; } } } - Err((_, msg)) => { - failed_entries.push(format!("{}: extract failed ({})", entry_name, msg)); - } } - } + + if let Some(tid) = task_id { + let final_status = if failed_items > 0 { "failed" } else { "completed" }; + let final_message = if failed_items > 0 { + Some("Uno o más archivos fallaron durante la extracción o la ingesta RAG") + } else { + None + }; + let _ = set_zip_rag_task_status( + &pool_bg, + tid, + final_status, + 100, + queued_count, + failed_items, + final_message, + ) + .await; + } + + tracing::info!( + "ZIP async RAG finalizado: {} assets, {} chunks (concurrency={})", + ingested_assets, + ingested_chunks, + rag_concurrency + ); + }); + + failed_entries.push(format!( + "Ingestion RAG iniciada en segundo plano para {} archivos. Puedes continuar usando el sistema mientras finaliza.", + queued_count + )); + rag_ingested_assets = 0; + rag_chunks_ingested = 0; } let _ = tokio::fs::remove_file(&zip_path).await; @@ -1377,6 +1820,8 @@ pub async fn import_assets_zip( rag_ingested_assets, rag_chunks_ingested, failed_entries, + rag_background_started, + rag_background_items, })) } @@ -1397,6 +1842,123 @@ fn replace_extension(filename: &str, new_ext: &str) -> String { format!("{}.{}", base, new_ext) } +fn replace_last_path_extension(path: &str, new_ext: &str) -> String { + if let Some((prefix, last)) = path.rsplit_once('/') { + return format!("{}/{}", prefix, replace_extension(last, new_ext)); + } + replace_extension(path, new_ext) +} + +fn build_public_url_from_storage_path(storage_path: &str) -> String { + if let Some((_, key)) = parse_s3_storage_path(storage_path) { + if let Some(settings) = get_s3_settings() { + return build_s3_public_url(&settings, key); + } + return storage_path.to_string(); + } + + let storage_filename = StdPath::new(storage_path) + .file_name() + .and_then(|s| s.to_str()) + .unwrap_or(""); + format!("/assets/{}", storage_filename) +} + +async fn normalize_flv_asset_for_rag( + pool: &PgPool, + asset: &mut Asset, +) -> Result<(), (StatusCode, String)> { + if !is_flv_media(&asset.filename, &asset.mimetype) { + return Ok(()); + } + + tokio::fs::create_dir_all("uploads/tmp") + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error creating temp dir: {}", e)))?; + + let input_path = format!("uploads/tmp/flv-normalize-in-{}.flv", asset.id); + let output_path = format!("uploads/tmp/flv-normalize-out-{}.mp4", asset.id); + + let source_bytes = read_storage_bytes(&asset.storage_path).await?; + tokio::fs::write(&input_path, source_bytes) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error writing temp FLV: {}", e)))?; + + if let Err(e) = transcode_flv_to_mp4(&input_path, &output_path).await { + let _ = tokio::fs::remove_file(&input_path).await; + let _ = tokio::fs::remove_file(&output_path).await; + return Err(e); + } + + let _ = tokio::fs::remove_file(&input_path).await; + + let output_bytes = tokio::fs::read(&output_path) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error reading temp MP4: {}", e)))?; + let _ = tokio::fs::remove_file(&output_path).await; + + let next_storage_path = replace_last_path_extension(&asset.storage_path, "mp4"); + if let Some((bucket, key)) = parse_s3_storage_path(&next_storage_path) { + let settings = get_s3_settings().ok_or(( + StatusCode::INTERNAL_SERVER_ERROR, + "S3 path detected but storage is not configured".to_string(), + ))?; + let client = build_s3_client(&settings).await?; + let old_storage_path = asset.storage_path.clone(); + + client + .put_object() + .bucket(bucket) + .key(key) + .content_type("video/mp4") + .body(output_bytes.clone().into()) + .send() + .await + .map_err(|e| (StatusCode::BAD_GATEWAY, format!("Error uploading normalized MP4 to S3: {}", e)))?; + + if old_storage_path != next_storage_path { + let _ = delete_storage_path(&old_storage_path).await; + } + } else { + tokio::fs::write(&next_storage_path, &output_bytes) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error writing normalized MP4: {}", e)))?; + + if asset.storage_path != next_storage_path { + let _ = tokio::fs::remove_file(&asset.storage_path).await; + } + } + + let next_filename = replace_extension(&asset.filename, "mp4"); + let next_size = output_bytes.len() as i64; + + sqlx::query( + r#" + UPDATE assets + SET filename = $1, + storage_path = $2, + mimetype = $3, + size_bytes = $4 + WHERE id = $5 + "#, + ) + .bind(&next_filename) + .bind(&next_storage_path) + .bind("video/mp4") + .bind(next_size) + .bind(asset.id) + .execute(pool) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error updating normalized asset: {}", e)))?; + + asset.filename = next_filename; + asset.storage_path = next_storage_path; + asset.mimetype = "video/mp4".to_string(); + asset.size_bytes = next_size; + + Ok(()) +} + async fn transcode_flv_to_mp4(input_path: &str, output_path: &str) -> Result<(), (StatusCode, String)> { let output = Command::new("ffmpeg") .arg("-y") @@ -1514,12 +2076,19 @@ async fn ingest_chunks_to_question_bank( } async fn extract_asset_text(asset: &Asset) -> Result { + extract_asset_text_with_endpoints(asset, None).await +} + +async fn extract_asset_text_with_endpoints( + asset: &Asset, + whisper_url_override: Option<&str>, +) -> Result { let lower_name = asset.filename.to_lowercase(); let mimetype = asset.mimetype.to_lowercase(); if mimetype.starts_with("audio/") || mimetype.starts_with("video/") { let bytes = read_storage_bytes(&asset.storage_path).await?; - return transcribe_media_bytes(bytes, &asset.filename).await; + return transcribe_media_bytes_with_override(bytes, &asset.filename, whisper_url_override).await; } if mimetype.contains("pdf") || lower_name.ends_with(".pdf") { @@ -1583,8 +2152,18 @@ async fn extract_pdf_text_from_bytes(bytes: Vec) -> Result, filename: &str) -> Result { +async fn transcribe_media_bytes_with_override( + file_data: Vec, + filename: &str, + whisper_url_override: Option<&str>, +) -> Result { let mut whisper_urls: Vec = Vec::new(); + if let Some(url) = whisper_url_override { + let trimmed = url.trim(); + if !trimmed.is_empty() { + whisper_urls.push(trimmed.trim_end_matches('/').to_string()); + } + } if let Ok(url) = std::env::var("WHISPER_URL") { let trimmed = url.trim(); if !trimmed.is_empty() { diff --git a/web/studio/src/app/admin/materials/page.tsx b/web/studio/src/app/admin/materials/page.tsx index 2cf6fbc..d8a896c 100644 --- a/web/studio/src/app/admin/materials/page.tsx +++ b/web/studio/src/app/admin/materials/page.tsx @@ -6,13 +6,14 @@ import { Upload, Database, FileArchive, CheckCircle2, AlertTriangle, Scissors } export default function AdminSharedMaterialsPage() { const [zipFile, setZipFile] = useState(null); - const [ingestRag, setIngestRag] = useState(true); + const [ingestRag, setIngestRag] = useState(false); const [englishLevel, setEnglishLevel] = useState(''); const [plans, setPlans] = useState([]); const [courses, setCourses] = useState([]); const [selectedPlanId, setSelectedPlanId] = useState(''); const [selectedCourseId, setSelectedCourseId] = useState(''); const [splitToRegular, setSplitToRegular] = useState(false); + const [useDevProcessing, setUseDevProcessing] = useState(false); const [regularPlanId, setRegularPlanId] = useState(''); const [regularCourses, setRegularCourses] = useState([]); const [selectedCourseIdR1, setSelectedCourseIdR1] = useState(''); @@ -27,6 +28,8 @@ export default function AdminSharedMaterialsPage() { rag_ingested_assets: number; rag_chunks_ingested: number; failed_entries: string[]; + rag_background_started?: boolean; + rag_background_items?: number; } | null>(null); const canUpload = useMemo(() => Boolean(zipFile) && !loading, [zipFile, loading]); @@ -159,6 +162,7 @@ export default function AdminSharedMaterialsPage() { splitToRegular, selectedCourseIdR1 || undefined, selectedCourseIdR2 || undefined, + useDevProcessing, ); setResult(response); setPhase('done'); @@ -218,7 +222,16 @@ export default function AdminSharedMaterialsPage() { checked={ingestRag} onChange={(e) => setIngestRag(e.target.checked)} /> - Ingerir automaticamente en RAG al importar + Ingerir automaticamente en RAG al importar (recomendado activar solo para ZIPs pequeños) + + +
@@ -396,7 +409,7 @@ export default function AdminSharedMaterialsPage() {

{statusText}

- Nota: esta importacion ZIP corre en la misma solicitud (no crea fila en Tasks), por eso aqui ves el estado en vivo. + Nota: la subida e importación base terminan en esta solicitud. Si activas RAG, su procesamiento puede continuar en segundo plano.

)} @@ -405,6 +418,12 @@ export default function AdminSharedMaterialsPage() { {result && (

Resultado de la Importacion

+ {result.rag_background_started && ( +
+ + RAG en segundo plano: {result.rag_background_items ?? 0} archivos en procesamiento +
+ )}
diff --git a/web/studio/src/app/admin/tasks/page.tsx b/web/studio/src/app/admin/tasks/page.tsx index 0543ed3..42ec662 100644 --- a/web/studio/src/app/admin/tasks/page.tsx +++ b/web/studio/src/app/admin/tasks/page.tsx @@ -98,6 +98,10 @@ export default function BackgroundTasksPage() { label = 'Transcription'; color = 'bg-purple-100 text-purple-800'; break; + case 'zip_rag_import': + label = 'ZIP RAG'; + color = 'bg-indigo-100 text-indigo-800'; + break; case 'lesson_image': label = 'Lesson Image'; color = 'bg-blue-100 text-blue-800'; @@ -165,7 +169,7 @@ export default function BackgroundTasksPage() {
({format(new Date(task.updated_at), 'yyyy')})
- {task.status === 'failed' && ( + {task.task_type === 'lesson_transcription' && task.status === 'failed' && ( )} - + {task.task_type === 'lesson_transcription' && ( + + )} ))} diff --git a/web/studio/src/lib/api.ts b/web/studio/src/lib/api.ts index 526d442..bf650f9 100644 --- a/web/studio/src/lib/api.ts +++ b/web/studio/src/lib/api.ts @@ -755,6 +755,8 @@ export interface AssetZipImportResult { rag_ingested_assets: number; rag_chunks_ingested: number; failed_entries: string[]; + rag_background_started?: boolean; + rag_background_items?: number; } export interface Cohort { @@ -1130,6 +1132,7 @@ export const cmsApi = { splitToRegular = false, samCourseIdR1?: number, samCourseIdR2?: number, + useDevProcessing = false, ): Promise => { return new Promise((resolve, reject) => { const maxNetworkRetries = 2; @@ -1147,6 +1150,9 @@ export const cmsApi = { if (samCourseIdR1) formData.append('sam_course_id_r1', String(samCourseIdR1)); if (samCourseIdR2) formData.append('sam_course_id_r2', String(samCourseIdR2)); } + if (useDevProcessing) { + formData.append('use_dev_processing', 'true'); + } const xhr = new XMLHttpRequest(); xhr.open('POST', `${API_BASE_URL}/api/assets/import-zip`); @@ -1819,7 +1825,7 @@ export interface BackgroundTask { id: string; title: string; course_title?: string; - task_type: 'lesson_transcription' | 'lesson_image' | 'course_image'; + task_type: 'lesson_transcription' | 'lesson_image' | 'course_image' | 'zip_rag_import'; status: 'idle' | 'queued' | 'processing' | 'failed' | 'completed' | 'error'; progress: number; updated_at: string;