From 254900746d05e20e63b06c823583a0d4733676aa Mon Sep 17 00:00:00 2001 From: Nurfog Date: Fri, 17 Apr 2026 17:05:46 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20agregar=20configuraci=C3=B3n=20para=20a?= =?UTF-8?q?lmacenamiento=20remoto=20de=20ZIP=20en=20modo=20DEV=20y=20mejor?= =?UTF-8?q?ar=20la=20gesti=C3=B3n=20de=20l=C3=ADmites=20de=20tama=C3=B1o?= =?UTF-8?q?=20de=20archivos=20en=20la=20importaci=C3=B3n=20de=20ZIP?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.dev.example | 9 + services/cms-service/src/handlers_assets.rs | 248 ++++++++++++++---- .../lms-service/src/handlers_discussions.rs | 8 +- 3 files changed, 211 insertions(+), 54 deletions(-) diff --git a/.env.dev.example b/.env.dev.example index 024e75c..e678644 100644 --- a/.env.dev.example +++ b/.env.dev.example @@ -57,5 +57,14 @@ LOCAL_LLM_MODEL=llama3.2:3b # Workers para procesamiento RAG asíncrono post-ZIP ("túneles"): 1..12 ZIP_RAG_CONCURRENCY=5 +# Opcional: almacenamiento remoto para ZIP en modo DEV (cuando marcas "Procesar este ZIP con infraestructura DEV") +# Si defines DEV_S3_BUCKET, el ZIP se descomprime/procesa localmente y los archivos listos +# (incluyendo FLV ya convertido a MP4) se suben a este S3 remoto para consumo RAG. +DEV_S3_BUCKET= +DEV_S3_REGION=us-east-2 +DEV_S3_ENDPOINT= +DEV_S3_PUBLIC_BASE_URL= +DEV_S3_FORCE_PATH_STYLE=false + # Backend-to-backend (LMS -> CMS) CMS_API_URL=http://studio:3001 diff --git a/services/cms-service/src/handlers_assets.rs b/services/cms-service/src/handlers_assets.rs index 2a50354..3cce5a8 100644 --- a/services/cms-service/src/handlers_assets.rs +++ b/services/cms-service/src/handlers_assets.rs @@ -25,6 +25,26 @@ use tokio::process::Command; use tokio::io::AsyncWriteExt; use tokio::task::JoinSet; +const DEFAULT_ZIP_IMPORT_MAX_UPLOAD_BYTES: u64 = 512 * 1024 * 1024; // 512 MiB +const DEFAULT_ZIP_IMPORT_MAX_ENTRY_BYTES: u64 = 64 * 1024 * 1024; // 64 MiB por archivo +const DEFAULT_ZIP_IMPORT_MAX_TOTAL_BYTES: u64 = 1024 * 1024 * 1024; // 1 GiB descomprimido + +fn read_env_u64_with_bounds(name: &str, default: u64, min: u64, max: u64) -> u64 { + env::var(name) + .ok() + .and_then(|v| v.trim().parse::().ok()) + .map(|v| v.clamp(min, max)) + .unwrap_or(default) +} + +fn read_env_usize_with_bounds(name: &str, default: usize, min: usize, max: usize) -> usize { + env::var(name) + .ok() + .and_then(|v| v.trim().parse::().ok()) + .map(|v| v.clamp(min, max)) + .unwrap_or(default) +} + #[derive(Debug, Serialize)] pub struct AssetUploadResponse { pub id: Uuid, @@ -73,22 +93,20 @@ struct S3Settings { force_path_style: bool, } -fn get_s3_settings() -> Option { - let enabled = env::var("ASSETS_STORAGE") - .unwrap_or_else(|_| "local".to_string()) - .to_lowercase(); +fn load_s3_settings_from_env(prefix: &str) -> Option { + let bucket_key = format!("{}S3_BUCKET", prefix); + let region_key = format!("{}S3_REGION", prefix); + let endpoint_key = format!("{}S3_ENDPOINT", prefix); + let public_base_key = format!("{}S3_PUBLIC_BASE_URL", prefix); + let force_path_key = format!("{}S3_FORCE_PATH_STYLE", prefix); - if enabled != "s3" { - return None; - } - - let bucket = env::var("S3_BUCKET").ok()?; - let region = env::var("S3_REGION").unwrap_or_else(|_| "us-east-2".to_string()); - let endpoint = env::var("S3_ENDPOINT").ok().filter(|v| !v.trim().is_empty()); - let public_base_url = env::var("S3_PUBLIC_BASE_URL") + let bucket = env::var(&bucket_key).ok()?; + let region = env::var(®ion_key).unwrap_or_else(|_| "us-east-2".to_string()); + let endpoint = env::var(&endpoint_key).ok().filter(|v| !v.trim().is_empty()); + let public_base_url = env::var(&public_base_key) .ok() .filter(|v| !v.trim().is_empty()); - let force_path_style = env::var("S3_FORCE_PATH_STYLE") + let force_path_style = env::var(&force_path_key) .map(|v| { let lower = v.to_lowercase(); lower == "1" || lower == "true" || lower == "yes" @@ -104,6 +122,38 @@ fn get_s3_settings() -> Option { }) } +fn get_s3_settings() -> Option { + let enabled = env::var("ASSETS_STORAGE") + .unwrap_or_else(|_| "local".to_string()) + .to_lowercase(); + + if enabled != "s3" { + return None; + } + + load_s3_settings_from_env("") +} + +fn get_dev_s3_settings() -> Option { + load_s3_settings_from_env("DEV_") +} + +fn get_s3_settings_for_bucket(bucket: &str) -> Option { + if let Some(default) = get_s3_settings() { + if default.bucket == bucket { + return Some(default); + } + } + + if let Some(dev) = get_dev_s3_settings() { + if dev.bucket == bucket { + return Some(dev); + } + } + + None +} + async fn build_s3_client(settings: &S3Settings) -> Result { let region_provider = RegionProviderChain::first_try(Some(Region::new(settings.region.clone()))) .or_default_provider(); @@ -213,7 +263,7 @@ async fn push_bytes_to_s3( async fn delete_storage_path(storage_path: &str) -> Result<(), (StatusCode, String)> { if let Some((bucket, key)) = parse_s3_storage_path(storage_path) { - let settings = get_s3_settings().ok_or(( + let settings = get_s3_settings_for_bucket(bucket).ok_or(( StatusCode::INTERNAL_SERVER_ERROR, "Se encontró una ruta de almacenamiento S3 pero S3 no está configurado".to_string(), ))?; @@ -232,6 +282,12 @@ async fn delete_storage_path(storage_path: &str) -> Result<(), (StatusCode, Stri Ok(()) } +async fn cleanup_local_temp_file(storage_path: &str) { + if !storage_path.starts_with("s3://") { + let _ = tokio::fs::remove_file(storage_path).await; + } +} + fn parse_s3_storage_path(path: &str) -> Option<(&str, &str)> { let without_prefix = path.strip_prefix("s3://")?; let (bucket, key) = without_prefix.split_once('/')?; @@ -255,7 +311,7 @@ pub async fn public_s3_proxy( .cloned() .ok_or((StatusCode::BAD_REQUEST, "Falta la clave (key)".to_string()))?; - let settings = get_s3_settings().ok_or(( + let settings = get_s3_settings_for_bucket(&bucket).ok_or(( StatusCode::NOT_FOUND, "El almacenamiento S3 no está configurado".to_string(), ))?; @@ -286,7 +342,7 @@ pub async fn public_s3_proxy( async fn read_storage_bytes(storage_path: &str) -> Result, (StatusCode, String)> { if let Some((bucket, key)) = parse_s3_storage_path(storage_path) { - let settings = get_s3_settings().ok_or(( + let settings = get_s3_settings_for_bucket(bucket).ok_or(( StatusCode::INTERNAL_SERVER_ERROR, "S3 storage path found but S3 is not configured".to_string(), ))?; @@ -856,28 +912,51 @@ async fn process_zip_entry_without_rag( let (storage_path, stored_filename, mimetype) = if is_flv { if use_dev_processing && ingest_rag { - let storage_path = build_ready_for_rag_path(org_id, asset_id, &format!("{}.flv", asset_id)); + let temp_storage_filename = format!("{}.flv", asset_id); + let temp_storage_path = format!("uploads/tmp/{}", temp_storage_filename); + tokio::fs::create_dir_all("uploads/tmp") + .await + .map_err(|e| format!("{}: Error creating temp dir ({})", entry_name, e))?; + tokio::fs::write(&temp_storage_path, &content) + .await + .map_err(|e| format!("{}: Error en la escritura local ({})", entry_name, e))?; + + let storage_path = build_ready_for_rag_path(org_id, asset_id, &format!("{}.mp4", asset_id)); tokio::fs::create_dir_all(StdPath::new(&storage_path).parent().unwrap_or(StdPath::new("."))) .await .map_err(|e| format!("{}: Error creating ready-for-rag dir ({})", entry_name, e))?; - tokio::fs::write(&storage_path, &content) - .await - .map_err(|e| format!("{}: Error en la escritura local ({})", entry_name, e))?; + + if let Err((_, msg)) = transcode_flv_to_mp4(&temp_storage_path, &storage_path).await { + let _ = tokio::fs::remove_file(&temp_storage_path).await; + return Err(format!("{}: la transcodificación de flv falló ({})", entry_name, msg)); + } + let _ = tokio::fs::remove_file(&temp_storage_path).await; ( storage_path, - safe_filename.clone(), - if guessed_mimetype.is_empty() { "video/x-flv".to_string() } else { guessed_mimetype.clone() }, + replace_extension(&safe_filename, "mp4"), + "video/mp4".to_string(), ) } else if use_dev_processing { - let storage_filename = format!("{}.flv", asset_id); - let storage_path = format!("uploads/{}", storage_filename); - tokio::fs::write(&storage_path, &content) + let temp_storage_filename = format!("{}.flv", asset_id); + let temp_storage_path = format!("uploads/tmp/{}", temp_storage_filename); + tokio::fs::create_dir_all("uploads/tmp") + .await + .map_err(|e| format!("{}: Error creating temp dir ({})", entry_name, e))?; + tokio::fs::write(&temp_storage_path, &content) .await .map_err(|e| format!("{}: Error en la escritura local ({})", entry_name, e))?; + + let final_storage_filename = format!("{}.mp4", asset_id); + let final_storage_path = format!("uploads/{}", final_storage_filename); + if let Err((_, msg)) = transcode_flv_to_mp4(&temp_storage_path, &final_storage_path).await { + let _ = tokio::fs::remove_file(&temp_storage_path).await; + return Err(format!("{}: la transcodificación de flv falló ({})", entry_name, msg)); + } + let _ = tokio::fs::remove_file(&temp_storage_path).await; ( - storage_path, - safe_filename.clone(), - if guessed_mimetype.is_empty() { "video/x-flv".to_string() } else { guessed_mimetype.clone() }, + final_storage_path, + replace_extension(&safe_filename, "mp4"), + "video/mp4".to_string(), ) } else { let temp_storage_filename = format!("{}.flv", asset_id); @@ -948,9 +1027,7 @@ async fn process_zip_entry_without_rag( .await .map_err(|(_, msg)| format!("{}: s3 upload failed ({})", entry_name, msg))?; - if is_flv { - let _ = tokio::fs::remove_file(&storage_path).await; - } + cleanup_local_temp_file(&storage_path).await; (s3_path, uploaded_len, public_url) } else { @@ -1006,6 +1083,25 @@ pub async fn import_assets_zip( State(pool): State, mut multipart: Multipart, ) -> Result, (StatusCode, String)> { + let max_upload_bytes = read_env_u64_with_bounds( + "ZIP_IMPORT_MAX_UPLOAD_BYTES", + DEFAULT_ZIP_IMPORT_MAX_UPLOAD_BYTES, + 1, + 10 * 1024 * 1024 * 1024, + ); + let max_entry_bytes = read_env_u64_with_bounds( + "ZIP_IMPORT_MAX_ENTRY_BYTES", + DEFAULT_ZIP_IMPORT_MAX_ENTRY_BYTES, + 1, + 2 * 1024 * 1024 * 1024, + ); + let max_total_uncompressed_bytes = read_env_u64_with_bounds( + "ZIP_IMPORT_MAX_TOTAL_BYTES", + DEFAULT_ZIP_IMPORT_MAX_TOTAL_BYTES, + 1, + 20 * 1024 * 1024 * 1024, + ); + let mut zip_temp_path: Option = None; let mut course_id: Option = None; let mut english_level: Option = None; @@ -1033,12 +1129,25 @@ pub async fn import_assets_zip( let mut temp_file = tokio::fs::File::create(&temp_name) .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to create temp zip file: {}", e)))?; + let mut received_bytes: u64 = 0; while let Some(chunk) = field .chunk() .await .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to read upload chunk: {}", e)))? { + received_bytes = received_bytes.saturating_add(chunk.len() as u64); + if received_bytes > max_upload_bytes { + let _ = tokio::fs::remove_file(&temp_name).await; + return Err(( + StatusCode::PAYLOAD_TOO_LARGE, + format!( + "ZIP demasiado grande (>{} bytes). Ajusta ZIP_IMPORT_MAX_UPLOAD_BYTES si necesitas permitir más tamaño.", + max_upload_bytes + ), + )); + } + temp_file .write_all(&chunk) .await @@ -1120,13 +1229,14 @@ pub async fn import_assets_zip( .map_err(|_| (StatusCode::BAD_REQUEST, "Invalid ZIP file".to_string()))?; if archive.is_empty() { - let _ = tokio::fs::remove_file(&zip_path).await; + let _ = std::fs::remove_file(&zip_path); return Err((StatusCode::BAD_REQUEST, "No ZIP file uploaded".to_string())); } // ── Phase 1: collect all ZIP entries into memory ────────────────────────── let mut all_entries: Vec = Vec::new(); let mut unit_set: std::collections::BTreeSet = Default::default(); + let mut total_uncompressed_bytes: u64 = 0; let len = archive.len(); for i in 0..len { @@ -1141,6 +1251,32 @@ pub async fn import_assets_zip( if entry_name.starts_with("__MACOSX/") || entry_name.ends_with(".DS_Store") { continue; } + + let declared_entry_size = file.size(); + if declared_entry_size > max_entry_bytes { + let _ = std::fs::remove_file(&zip_path); + return Err(( + StatusCode::PAYLOAD_TOO_LARGE, + format!( + "Entrada ZIP demasiado grande: {} ({} bytes). Límite actual por archivo: {} bytes (ZIP_IMPORT_MAX_ENTRY_BYTES).", + entry_name, + declared_entry_size, + max_entry_bytes + ), + )); + } + total_uncompressed_bytes = total_uncompressed_bytes.saturating_add(declared_entry_size); + if total_uncompressed_bytes > max_total_uncompressed_bytes { + let _ = std::fs::remove_file(&zip_path); + return Err(( + StatusCode::PAYLOAD_TOO_LARGE, + format!( + "El ZIP excede el límite descomprimido total ({} bytes). Ajusta ZIP_IMPORT_MAX_TOTAL_BYTES para permitir más.", + max_total_uncompressed_bytes + ), + )); + } + let safe_filename = StdPath::new(&entry_name) .file_name() .and_then(|s| s.to_str()) @@ -1175,6 +1311,9 @@ pub async fn import_assets_zip( }); } + // ZipArchive usa tipos no-Send; se libera antes de cualquier await posterior. + drop(archive); + // ── Phase 1b: calculate split midpoint (intensive → 2 regular courses) ─── // For 8-10 units: first half → regular 1, second half → regular 2. // Mid is the last unit number that goes to regular 1 (ceiling of N/2). @@ -1193,6 +1332,8 @@ pub async fn import_assets_zip( // Sort: audio/video first so their asset IDs are known when text is ingested all_entries.sort_by_key(|e| if e.is_audio_video { 0usize } else { 1 }); + // El modo DEV solo cambia endpoints de IA/procesamiento. + // El almacenamiento de assets del ZIP siempre usa el S3 del proyecto. let s3_settings = get_s3_settings(); let s3_client = if let Some(settings) = &s3_settings { Some(build_s3_client(settings).await?) @@ -1324,18 +1465,31 @@ pub async fn import_assets_zip( let (storage_path, stored_filename, mimetype) = if is_flv { if use_dev_processing { - let storage_path = build_ready_for_rag_path(org_ctx.id, asset_id, &format!("{}.flv", asset_id)); - tokio::fs::create_dir_all(StdPath::new(&storage_path).parent().unwrap_or(StdPath::new("."))) + let temp_storage_filename = format!("{}.flv", asset_id); + let temp_storage_path = format!("uploads/tmp/{}", temp_storage_filename); + tokio::fs::create_dir_all("uploads/tmp") .await - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error creating ready-for-rag dir: {}", e)))?; - if let Err(e) = tokio::fs::write(&storage_path, &content).await { + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error creating temp dir: {}", e)))?; + if let Err(e) = tokio::fs::write(&temp_storage_path, &content).await { failed_entries.push(format!("{}: local write failed ({})", entry_name, e)); continue; } + + let storage_path = build_ready_for_rag_path(org_ctx.id, asset_id, &format!("{}.mp4", asset_id)); + tokio::fs::create_dir_all(StdPath::new(&storage_path).parent().unwrap_or(StdPath::new("."))) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error creating ready-for-rag dir: {}", e)))?; + + if let Err((_, msg)) = transcode_flv_to_mp4(&temp_storage_path, &storage_path).await { + let _ = tokio::fs::remove_file(&temp_storage_path).await; + failed_entries.push(format!("{}: flv transcode failed ({})", entry_name, msg)); + continue; + } + let _ = tokio::fs::remove_file(&temp_storage_path).await; ( storage_path, - safe_filename.clone(), - if guessed_mimetype.is_empty() { "video/x-flv".to_string() } else { guessed_mimetype.clone() }, + replace_extension(&safe_filename, "mp4"), + "video/mp4".to_string(), ) } else { let temp_storage_filename = format!("{}.flv", asset_id); @@ -1407,15 +1561,11 @@ pub async fn import_assets_zip( match push_bytes_to_s3(client, settings, &key, &mimetype, upload_bytes).await { Ok((s3_path, public_url)) => { - if is_flv { - let _ = tokio::fs::remove_file(&storage_path).await; - } + cleanup_local_temp_file(&storage_path).await; (s3_path, public_url) } Err((_, msg)) => { - if is_flv { - let _ = tokio::fs::remove_file(&storage_path).await; - } + cleanup_local_temp_file(&storage_path).await; failed_entries.push(format!("{}: s3 upload failed ({})", entry_name, msg)); continue; } @@ -1850,8 +2000,8 @@ fn replace_last_path_extension(path: &str, new_ext: &str) -> String { } fn build_public_url_from_storage_path(storage_path: &str) -> String { - if let Some((_, key)) = parse_s3_storage_path(storage_path) { - if let Some(settings) = get_s3_settings() { + if let Some((bucket, key)) = parse_s3_storage_path(storage_path) { + if let Some(settings) = get_s3_settings_for_bucket(bucket) { return build_s3_public_url(&settings, key); } return storage_path.to_string(); @@ -1899,7 +2049,7 @@ async fn normalize_flv_asset_for_rag( let next_storage_path = replace_last_path_extension(&asset.storage_path, "mp4"); if let Some((bucket, key)) = parse_s3_storage_path(&next_storage_path) { - let settings = get_s3_settings().ok_or(( + let settings = get_s3_settings_for_bucket(bucket).ok_or(( StatusCode::INTERNAL_SERVER_ERROR, "S3 path detected but storage is not configured".to_string(), ))?; @@ -1960,8 +2110,12 @@ async fn normalize_flv_asset_for_rag( } async fn transcode_flv_to_mp4(input_path: &str, output_path: &str) -> Result<(), (StatusCode, String)> { + let ffmpeg_threads = read_env_usize_with_bounds("ZIP_FFMPEG_THREADS", 1, 1, 8); + let output = Command::new("ffmpeg") .arg("-y") + .arg("-threads") + .arg(ffmpeg_threads.to_string()) .arg("-i") .arg(input_path) .arg("-c:v") diff --git a/services/lms-service/src/handlers_discussions.rs b/services/lms-service/src/handlers_discussions.rs index adbda19..8cd3f70 100644 --- a/services/lms-service/src/handlers_discussions.rs +++ b/services/lms-service/src/handlers_discussions.rs @@ -26,7 +26,6 @@ struct ForumEmailRecipient { struct EmailTemplate { subject_template: String, body_template: String, - is_html: bool, is_enabled: bool, } @@ -155,12 +154,9 @@ async fn load_org_smtp_config(pool: &PgPool, organization_id: Uuid) -> Option Option { - let cms_api_url = env::var("CMS_API_URL").unwrap_or_else(|_| "http://localhost:3001".to_string()); - let url = format!("{}/organization/email-templates", cms_api_url); - // Para simplificar, por ahora devolvemos plantillas hardcoded // En producción, haríamos la llamada HTTP con autenticación match template_key { @@ -177,7 +173,6 @@ Ver hilo completo: {{thread_url}} Saludos, El equipo de {{organization_name}}".to_string(), - is_html: false, is_enabled: true, }), "forum_thread" => Some(EmailTemplate { @@ -193,7 +188,6 @@ Ver hilo: {{thread_url}} Saludos, El equipo de {{organization_name}}".to_string(), - is_html: false, is_enabled: true, }), _ => None,