diff --git a/services/cms-service/src/handlers/tasks.rs b/services/cms-service/src/handlers/tasks.rs index ef659c4..e834560 100644 --- a/services/cms-service/src/handlers/tasks.rs +++ b/services/cms-service/src/handlers/tasks.rs @@ -1,4 +1,5 @@ use crate::handlers::run_transcription_task; +use crate::handlers_assets::{ingest_asset_for_rag_core, set_zip_rag_task_status}; use axum::{ Json, extract::{Path, State}, @@ -6,6 +7,7 @@ use axum::{ }; use serde::Serialize; use sqlx::{FromRow, PgPool}; +use tokio::time::Duration; use uuid::Uuid; #[derive(Debug, Serialize, FromRow)] @@ -74,11 +76,24 @@ struct LessonStatusRow { transcription_status: Option, } +#[derive(sqlx::FromRow)] +struct ZipRetryTaskRow { + id: Uuid, + organization_id: Uuid, + created_by: Uuid, + metadata: serde_json::Value, + created_at: chrono::DateTime, +} + +#[derive(sqlx::FromRow)] +struct AssetIdRow { + id: Uuid, +} + pub async fn retry_task( State(pool): State, Path(id): Path, ) -> Result { - // Check lessons for transcription failures let lesson = sqlx::query_as::<_, LessonStatusRow>("SELECT transcription_status FROM lessons WHERE id = $1") .bind(id) @@ -97,6 +112,176 @@ pub async fn retry_task( } } + let zip_task = sqlx::query_as::<_, ZipRetryTaskRow>( + r#" + SELECT id, organization_id, created_by, metadata, created_at + FROM background_tasks + WHERE id = $1 + AND task_type = 'zip_rag_import' + AND status = 'failed' + "#, + ) + .bind(id) + .fetch_optional(&pool) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + if let Some(task) = zip_task { + let zip_batch_id_from_metadata = task + .metadata + .get("zip_batch_id") + .and_then(|v| v.as_str()) + .map(|raw| { + Uuid::parse_str(raw).map_err(|_| { + ( + StatusCode::BAD_REQUEST, + "zip_batch_id inválido en metadata de la tarea".to_string(), + ) + }) + }) + .transpose()?; + + let zip_batch_id = if let Some(id) = zip_batch_id_from_metadata { + id + } else { + #[derive(sqlx::FromRow)] + struct ZipBatchIdRow { + zip_batch_id: Uuid, + } + + let candidates = sqlx::query_as::<_, ZipBatchIdRow>( + r#" + SELECT zip_batch_id + FROM assets + WHERE organization_id = $1 + AND uploaded_by = $2 + AND zip_batch_id IS NOT NULL + AND created_at BETWEEN ($3 - INTERVAL '30 minutes') AND ($3 + INTERVAL '30 minutes') + GROUP BY zip_batch_id + ORDER BY COUNT(*) DESC, MAX(created_at) DESC + "#, + ) + .bind(task.organization_id) + .bind(task.created_by) + .bind(task.created_at) + .fetch_all(&pool) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + if candidates.len() == 1 { + candidates[0].zip_batch_id + } else if candidates.is_empty() { + return Err(( + StatusCode::BAD_REQUEST, + "La tarea no tiene zip_batch_id y no se pudo inferir un lote asociado. Vuelve a cargar el ZIP para reingestar.".to_string(), + )); + } else { + return Err(( + StatusCode::BAD_REQUEST, + "La tarea no tiene zip_batch_id y hay múltiples lotes candidatos; vuelve a cargar el ZIP para evitar ambigüedad.".to_string(), + )); + } + }; + + let assets = sqlx::query_as::<_, AssetIdRow>( + "SELECT id FROM assets WHERE organization_id = $1 AND zip_batch_id = $2 ORDER BY created_at ASC", + ) + .bind(task.organization_id) + .bind(zip_batch_id) + .fetch_all(&pool) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + if assets.is_empty() { + return Err(( + StatusCode::BAD_REQUEST, + "No se encontraron assets para ese zip_batch_id".to_string(), + )); + } + + let pool_clone = pool.clone(); + tokio::spawn(async move { + let total = assets.len(); + let mut processed = 0usize; + let mut failed = 0usize; + + let _ = set_zip_rag_task_status(&pool_clone, task.id, "processing", 0, 0, 0, None).await; + + for row in assets { + let ingest_result = tokio::time::timeout( + Duration::from_secs(120), + ingest_asset_for_rag_core(&pool_clone, task.organization_id, task.created_by, row.id), + ) + .await; + + match ingest_result { + Ok(Ok(_)) => { + processed += 1; + } + Ok(Err(msg)) => { + failed += 1; + tracing::warn!( + "Retry ZIP RAG: fallo ingesta asset {} en task {}: {}", + row.id, + task.id, + msg + ); + } + Err(_elapsed) => { + failed += 1; + tracing::warn!( + "Retry ZIP RAG: timeout (120s) en asset {} en task {}", + row.id, + task.id + ); + } + } + + let progress = (((processed + failed) as f32 / total as f32) * 100.0) + .round() + .clamp(0.0, 100.0) as i32; + + let _ = set_zip_rag_task_status( + &pool_clone, + task.id, + "processing", + progress, + processed, + failed, + None, + ) + .await; + } + + let final_status = if failed > 0 { "failed" } else { "completed" }; + let final_msg = if failed > 0 { + Some("Reintento RAG completado con errores parciales") + } else { + None + }; + if let Err(e) = set_zip_rag_task_status( + &pool_clone, + task.id, + final_status, + 100, + processed, + failed, + final_msg, + ) + .await + { + tracing::error!( + "Retry ZIP RAG: no se pudo actualizar estado final de task {} a '{}': {:?}", + task.id, + final_status, + e + ); + } + }); + + return Ok(StatusCode::ACCEPTED); + } + Ok(StatusCode::NOT_FOUND) } diff --git a/services/cms-service/src/handlers_assets.rs b/services/cms-service/src/handlers_assets.rs index 502cea2..7033919 100644 --- a/services/cms-service/src/handlers_assets.rs +++ b/services/cms-service/src/handlers_assets.rs @@ -680,39 +680,33 @@ pub async fn delete_asset( Ok(StatusCode::NO_CONTENT) } -/// POST /api/assets/:id/ingest-rag - Ingesta un asset (PDF/audio/video/texto) en chunks para RAG -pub async fn ingest_asset_for_rag( - Org(org_ctx): Org, - claims: Claims, - State(pool): State, - Path(id): Path, -) -> Result, (StatusCode, String)> { - let asset: Asset = sqlx::query_as( - "SELECT * FROM assets WHERE id = $1 AND organization_id = $2" - ) - .bind(id) - .bind(org_ctx.id) - .fetch_optional(&pool) - .await - .map_err(|e: sqlx::Error| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? - .ok_or((StatusCode::NOT_FOUND, "Activo no encontrado".to_string()))?; +/// Lógica interna de ingesta RAG reutilizable (también usada en retry de tareas). +pub async fn ingest_asset_for_rag_core( + pool: &PgPool, + org_id: Uuid, + user_id: Uuid, + asset_id: Uuid, +) -> Result<(usize, usize), String> { + let asset: Asset = sqlx::query_as("SELECT * FROM assets WHERE id = $1 AND organization_id = $2") + .bind(asset_id) + .bind(org_id) + .fetch_optional(pool) + .await + .map_err(|e| e.to_string())? + .ok_or_else(|| "Activo no encontrado".to_string())?; - let extracted = extract_asset_text(&asset).await?; - let content = extracted.trim(); + let extracted = extract_asset_text(&asset) + .await + .map_err(|(_, msg)| msg)?; + let content = extracted.trim().to_string(); if content.len() < 80 { - return Err(( - StatusCode::BAD_REQUEST, - "No se encontró suficiente texto utilizable en el archivo".to_string(), - )); + return Err("No se encontró suficiente texto utilizable en el archivo".to_string()); } - let chunks = chunk_text(content, 900); + let chunks = chunk_text(&content, 900); if chunks.is_empty() { - return Err(( - StatusCode::BAD_REQUEST, - "No se pudo generar contenido para RAG".to_string(), - )); + return Err("No se pudo generar contenido para RAG".to_string()); } sqlx::query( @@ -723,11 +717,11 @@ pub async fn ingest_asset_for_rag( AND source_metadata->>'asset_id' = $2 "#, ) - .bind(org_ctx.id) + .bind(org_id) .bind(asset.id.to_string()) - .execute(&pool) + .execute(pool) .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Cleanup failed: {}", e)))?; + .map_err(|e| format!("Cleanup failed: {}", e))?; let source_kind = if asset.mimetype.starts_with("audio/") || asset.mimetype.starts_with("video/") { "audio-transcription" @@ -747,16 +741,16 @@ pub async fn ingest_asset_for_rag( .danger_accept_invalid_certs(true) .danger_accept_invalid_hostnames(true) .build() - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("HTTP client error: {}", e)))?; + .map_err(|e| format!("HTTP client error: {}", e))?; let ollama_url = ai::get_ollama_url(); let model = ai::get_embedding_model(); ingest_chunks_to_question_bank( - &pool, - org_ctx.id, - claims.sub, + pool, + org_id, + user_id, &asset, - &source_kind, + source_kind, skill, &chunks, &client, @@ -766,13 +760,51 @@ pub async fn ingest_asset_for_rag( None, asset.unit_number, ) - .await?; + .await + .map_err(|(_, msg)| msg)?; + + Ok((chunks.len(), content.len())) +} + +/// POST /api/assets/:id/ingest-rag - Ingesta un asset (PDF/audio/video/texto) en chunks para RAG +pub async fn ingest_asset_for_rag( + Org(org_ctx): Org, + claims: Claims, + State(pool): State, + Path(id): Path, +) -> Result, (StatusCode, String)> { + let (chunks_ingested, chars_ingested) = ingest_asset_for_rag_core(&pool, org_ctx.id, claims.sub, id) + .await + .map_err(|e| { + if e.contains("Activo no encontrado") { + (StatusCode::NOT_FOUND, e) + } else if e.contains("suficiente texto") || e.contains("generar contenido") { + (StatusCode::BAD_REQUEST, e) + } else { + (StatusCode::INTERNAL_SERVER_ERROR, e) + } + })?; + + let asset: Asset = sqlx::query_as("SELECT * FROM assets WHERE id = $1 AND organization_id = $2") + .bind(id) + .bind(org_ctx.id) + .fetch_one(&pool) + .await + .map_err(|e: sqlx::Error| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + let source_kind = if asset.mimetype.starts_with("audio/") || asset.mimetype.starts_with("video/") { + "audio-transcription" + } else if asset.mimetype.contains("pdf") { + "pdf" + } else { + "text" + }; Ok(Json(AssetRagIngestResponse { asset_id: asset.id, source: source_kind.to_string(), - chunks_ingested: chunks.len(), - chars_ingested: content.len(), + chunks_ingested, + chars_ingested, })) } @@ -825,6 +857,7 @@ async fn create_zip_rag_background_task( org_id: Uuid, user_id: Uuid, course_id: Option, + zip_batch_id: Uuid, total_items: usize, ) -> Result { let task_id = Uuid::new_v4(); @@ -865,7 +898,7 @@ async fn create_zip_rag_background_task( $5, 0, 0, - '{}'::jsonb, + jsonb_build_object('zip_batch_id', $6::text), NOW(), NOW() ) @@ -876,13 +909,14 @@ async fn create_zip_rag_background_task( .bind(user_id) .bind(course_title) .bind(total_items as i32) + .bind(zip_batch_id.to_string()) .execute(pool) .await?; Ok(task_id) } -async fn set_zip_rag_task_status( +pub async fn set_zip_rag_task_status( pool: &PgPool, task_id: Uuid, status: &str, @@ -1739,6 +1773,7 @@ pub async fn import_assets_zip( org_ctx.id, claims.sub, course_id, + zip_batch_id, queued_count, ) .await diff --git a/web/studio/src/app/admin/tasks/page.tsx b/web/studio/src/app/admin/tasks/page.tsx index 42ec662..e347dc2 100644 --- a/web/studio/src/app/admin/tasks/page.tsx +++ b/web/studio/src/app/admin/tasks/page.tsx @@ -169,7 +169,7 @@ export default function BackgroundTasksPage() {
({format(new Date(task.updated_at), 'yyyy')})
- {task.task_type === 'lesson_transcription' && task.status === 'failed' && ( + {(task.task_type === 'lesson_transcription' || task.task_type === 'zip_rag_import') && task.status === 'failed' && (