feat: Implement ZIP RAG import functionality with background processing

- Added support for ZIP RAG import in the asset management system.
- Introduced a new background task type for ZIP RAG imports.
- Enhanced the asset import process to allow for optional development processing.
- Updated the UI to reflect the new RAG processing status and options.
- Created database migration for background tasks related to ZIP RAG imports.
- Refactored asset handling to support FLV normalization and improved error handling.
- Added new API endpoints and updated existing ones to accommodate changes.
This commit is contained in:
2026-04-17 12:51:50 -04:00
parent ccea101a5e
commit a3467d22d3
9 changed files with 821 additions and 153 deletions
+2
View File
@@ -54,6 +54,8 @@ SAM_DATABASE_URL=mysql://user:password@host:3306/sige_sam_v3
LOCAL_VIDEO_BRIDGE_URL=http://localhost:8000
EMBEDDING_MODEL=nomic-embed-text
LOCAL_LLM_MODEL=llama3.2:3b
# Workers para procesamiento RAG asíncrono post-ZIP ("túneles"): 1..12
ZIP_RAG_CONCURRENCY=5
# Backend-to-backend (LMS -> CMS)
CMS_API_URL=http://studio:3001
+23 -4
View File
@@ -3,11 +3,30 @@
# Allow large ZIP uploads (RAG bulk import can exceed 2GB).
client_max_body_size 4096m;
client_body_timeout 1800s;
client_body_timeout 43200s;
# API routes that need to go to port 3001
# Prefer the explicit `/cms-api/*` prefix for frontend fetches. This avoids collisions
# with Next.js pages like `/courses` and `/admin` that share the same host.
location = /cms-api/api/assets/import-zip {
# Upload/import of large ZIPs can run for several minutes.
# Keep this route unbuffered and with very high upstream timeouts.
rewrite ^/cms-api/(.*)$ /$1 break;
proxy_pass http://openccb-studio:3001;
proxy_request_buffering off;
proxy_buffering off;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $http_x_forwarded_proto;
proxy_set_header Connection "";
proxy_http_version 1.1;
proxy_connect_timeout 7200s;
proxy_send_timeout 43200s;
proxy_read_timeout 43200s;
send_timeout 43200s;
}
location /cms-api/ {
# CORS safety net at proxy level for CMS API.
set $cors_origin "";
@@ -42,9 +61,9 @@ location /cms-api/ {
proxy_set_header Connection "";
proxy_http_version 1.1;
proxy_connect_timeout 300s;
proxy_send_timeout 7200s;
proxy_read_timeout 7200s;
send_timeout 7200s;
proxy_send_timeout 43200s;
proxy_read_timeout 43200s;
send_timeout 43200s;
}
location /lms-api/ {
+1 -4
View File
@@ -121,7 +121,4 @@
1. Finalización de **Certificados y Progreso Real**.
2. Despliegue de **Infraestructura SMTP** para comunicación global.
3. Auditoría de **Accesibilidad Universal (WCAG)**.
4. Implementación de **IA de Moderación (Seguridad)**.
asdf
4. Implementación de **IA de Moderación (Seguridad)**.
@@ -0,0 +1,23 @@
CREATE TABLE IF NOT EXISTS background_tasks (
id UUID PRIMARY KEY,
organization_id UUID NOT NULL,
created_by UUID NOT NULL,
title TEXT NOT NULL,
course_title TEXT,
task_type VARCHAR(64) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'queued',
progress INTEGER NOT NULL DEFAULT 0,
total_items INTEGER NOT NULL DEFAULT 0,
processed_items INTEGER NOT NULL DEFAULT 0,
failed_items INTEGER NOT NULL DEFAULT 0,
error_message TEXT,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_background_tasks_org_status_updated
ON background_tasks (organization_id, status, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_background_tasks_type_status
ON background_tasks (task_type, status);
+30 -13
View File
@@ -23,20 +23,37 @@ pub async fn get_background_tasks(
State(pool): State<PgPool>,
) -> Result<Json<Vec<BackgroundTask>>, (StatusCode, String)> {
let query = r#"
SELECT
l.id,
l.title,
c.title as course_title,
'lesson_transcription' as task_type,
l.transcription_status as status,
0 as progress,
l.updated_at
FROM lessons l
JOIN modules m ON l.module_id = m.id
JOIN courses c ON m.course_id = c.id
WHERE l.transcription_status IN ('queued', 'processing', 'failed')
SELECT id, title, course_title, task_type, status, progress, updated_at
FROM (
SELECT
l.id,
l.title,
c.title as course_title,
'lesson_transcription' as task_type,
l.transcription_status as status,
0 as progress,
l.updated_at
FROM lessons l
JOIN modules m ON l.module_id = m.id
JOIN courses c ON m.course_id = c.id
WHERE l.transcription_status IN ('queued', 'processing', 'failed')
UNION ALL
SELECT
t.id,
t.title,
t.course_title,
t.task_type,
t.status,
t.progress,
t.updated_at
FROM background_tasks t
WHERE t.task_type = 'zip_rag_import'
AND t.status IN ('queued', 'processing', 'failed', 'completed')
) merged
ORDER BY updated_at DESC
LIMIT 200
"#;
let tasks = sqlx::query_as::<_, BackgroundTask>(query)
+698 -119
View File
@@ -20,6 +20,7 @@ use uuid::Uuid;
use std::collections::HashMap;
use std::env;
use std::path::Path as StdPath;
use std::sync::Arc;
use tokio::process::Command;
use tokio::io::AsyncWriteExt;
use tokio::task::JoinSet;
@@ -47,6 +48,8 @@ pub struct AssetZipImportResponse {
pub rag_ingested_assets: usize,
pub rag_chunks_ingested: usize,
pub failed_entries: Vec<String>,
pub rag_background_started: bool,
pub rag_background_items: usize,
}
#[derive(Debug, Deserialize)]
@@ -133,6 +136,18 @@ fn build_s3_object_key(org_id: Uuid, course_id: Option<Uuid>, storage_filename:
}
}
fn build_ready_for_rag_path(org_id: Uuid, asset_id: Uuid, filename: &str) -> String {
let ext = StdPath::new(filename)
.extension()
.and_then(|s| s.to_str())
.unwrap_or("");
if ext.is_empty() {
format!("uploads/ready-for-rag/{}/{}", org_id, asset_id)
} else {
format!("uploads/ready-for-rag/{}/{}.{}", org_id, asset_id, ext)
}
}
fn build_s3_public_url(settings: &S3Settings, key: &str) -> String {
if let Some(base) = &settings.public_base_url {
return format!("{}/{}", base.trim_end_matches('/'), key);
@@ -700,6 +715,109 @@ struct ZipEntryData {
is_flv: bool,
}
struct PendingZipRagItem {
entry_name: String,
asset: Asset,
is_audio_video: bool,
unit_number: Option<i32>,
}
async fn create_zip_rag_background_task(
pool: &PgPool,
org_id: Uuid,
user_id: Uuid,
course_id: Option<Uuid>,
total_items: usize,
) -> Result<Uuid, sqlx::Error> {
let task_id = Uuid::new_v4();
let course_title = if let Some(cid) = course_id {
sqlx::query_scalar::<_, String>("SELECT title FROM courses WHERE id = $1")
.bind(cid)
.fetch_optional(pool)
.await?
} else {
None
};
sqlx::query(
r#"
INSERT INTO background_tasks (
id,
organization_id,
created_by,
title,
course_title,
task_type,
status,
progress,
total_items,
processed_items,
failed_items,
metadata,
created_at,
updated_at
)
VALUES (
$1, $2, $3,
'ZIP import RAG processing',
$4,
'zip_rag_import',
'queued',
0,
$5,
0,
0,
'{}'::jsonb,
NOW(),
NOW()
)
"#,
)
.bind(task_id)
.bind(org_id)
.bind(user_id)
.bind(course_title)
.bind(total_items as i32)
.execute(pool)
.await?;
Ok(task_id)
}
async fn set_zip_rag_task_status(
pool: &PgPool,
task_id: Uuid,
status: &str,
progress: i32,
processed_items: usize,
failed_items: usize,
error_message: Option<&str>,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE background_tasks
SET status = $2,
progress = $3,
processed_items = $4,
failed_items = $5,
error_message = $6,
updated_at = NOW()
WHERE id = $1
AND task_type = 'zip_rag_import'
"#,
)
.bind(task_id)
.bind(status)
.bind(progress)
.bind(processed_items as i32)
.bind(failed_items as i32)
.bind(error_message)
.execute(pool)
.await?;
Ok(())
}
async fn process_zip_entry_without_rag(
entry: ZipEntryData,
org_id: Uuid,
@@ -714,6 +832,8 @@ async fn process_zip_entry_without_rag(
sam_course_id_r2: Option<i32>,
s3_settings: Option<S3Settings>,
s3_client: Option<S3Client>,
use_dev_processing: bool,
ingest_rag: bool,
) -> Result<(), String> {
let ZipEntryData {
entry_name,
@@ -735,39 +855,76 @@ async fn process_zip_entry_without_rag(
let asset_id = Uuid::new_v4();
let (storage_path, stored_filename, mimetype) = if is_flv {
let temp_storage_filename = format!("{}.flv", asset_id);
let temp_storage_path = format!("uploads/{}", temp_storage_filename);
tokio::fs::write(&temp_storage_path, &content)
.await
.map_err(|e| format!("{}: Error en la escritura local ({})", entry_name, e))?;
let final_storage_filename = format!("{}.mp4", asset_id);
let final_storage_path = format!("uploads/{}", final_storage_filename);
if let Err((_, msg)) = transcode_flv_to_mp4(&temp_storage_path, &final_storage_path).await {
let _ = tokio::fs::remove_file(&temp_storage_path).await;
return Err(format!("{}: la transcodificación de flv falló ({})", entry_name, msg));
}
let _ = tokio::fs::remove_file(&temp_storage_path).await;
(
final_storage_path,
replace_extension(&safe_filename, "mp4"),
"video/mp4".to_string(),
)
} else {
let extension = StdPath::new(&safe_filename)
.extension()
.and_then(|s| s.to_str())
.unwrap_or("");
let storage_filename = if extension.is_empty() {
asset_id.to_string()
if use_dev_processing && ingest_rag {
let storage_path = build_ready_for_rag_path(org_id, asset_id, &format!("{}.flv", asset_id));
tokio::fs::create_dir_all(StdPath::new(&storage_path).parent().unwrap_or(StdPath::new(".")))
.await
.map_err(|e| format!("{}: Error creating ready-for-rag dir ({})", entry_name, e))?;
tokio::fs::write(&storage_path, &content)
.await
.map_err(|e| format!("{}: Error en la escritura local ({})", entry_name, e))?;
(
storage_path,
safe_filename.clone(),
if guessed_mimetype.is_empty() { "video/x-flv".to_string() } else { guessed_mimetype.clone() },
)
} else if use_dev_processing {
let storage_filename = format!("{}.flv", asset_id);
let storage_path = format!("uploads/{}", storage_filename);
tokio::fs::write(&storage_path, &content)
.await
.map_err(|e| format!("{}: Error en la escritura local ({})", entry_name, e))?;
(
storage_path,
safe_filename.clone(),
if guessed_mimetype.is_empty() { "video/x-flv".to_string() } else { guessed_mimetype.clone() },
)
} else {
format!("{}.{}", asset_id, extension)
};
let storage_path = format!("uploads/{}", storage_filename);
let temp_storage_filename = format!("{}.flv", asset_id);
let temp_storage_path = format!("uploads/{}", temp_storage_filename);
tokio::fs::write(&temp_storage_path, &content)
.await
.map_err(|e| format!("{}: Error en la escritura local ({})", entry_name, e))?;
(storage_path, safe_filename.clone(), guessed_mimetype)
let final_storage_filename = format!("{}.mp4", asset_id);
let final_storage_path = format!("uploads/{}", final_storage_filename);
if let Err((_, msg)) = transcode_flv_to_mp4(&temp_storage_path, &final_storage_path).await {
let _ = tokio::fs::remove_file(&temp_storage_path).await;
return Err(format!("{}: la transcodificación de flv falló ({})", entry_name, msg));
}
let _ = tokio::fs::remove_file(&temp_storage_path).await;
(
final_storage_path,
replace_extension(&safe_filename, "mp4"),
"video/mp4".to_string(),
)
}
} else {
if use_dev_processing && ingest_rag {
let storage_path = build_ready_for_rag_path(org_id, asset_id, &safe_filename);
tokio::fs::create_dir_all(StdPath::new(&storage_path).parent().unwrap_or(StdPath::new(".")))
.await
.map_err(|e| format!("{}: Error creating ready-for-rag dir ({})", entry_name, e))?;
tokio::fs::write(&storage_path, &content)
.await
.map_err(|e| format!("{}: Error en la escritura local ({})", entry_name, e))?;
(storage_path, safe_filename.clone(), guessed_mimetype)
} else {
let extension = StdPath::new(&safe_filename)
.extension()
.and_then(|s| s.to_str())
.unwrap_or("");
let storage_filename = if extension.is_empty() {
asset_id.to_string()
} else {
format!("{}.{}", asset_id, extension)
};
let storage_path = format!("uploads/{}", storage_filename);
(storage_path, safe_filename.clone(), guessed_mimetype)
}
};
let storage_filename_for_s3 = StdPath::new(&storage_path)
@@ -858,6 +1015,7 @@ pub async fn import_assets_zip(
let mut split_to_regular = false;
let mut sam_course_id_r1: Option<i32> = None;
let mut sam_course_id_r2: Option<i32> = None;
let mut use_dev_processing = false;
while let Some(mut field) = multipart
.next_field()
@@ -940,6 +1098,11 @@ pub async fn import_assets_zip(
sam_course_id_r2 = Some(id);
}
}
} else if name == "use_dev_processing" {
if let Ok(txt) = field.text().await {
let v = txt.trim().to_lowercase();
use_dev_processing = v == "1" || v == "true" || v == "yes";
}
}
}
@@ -1046,22 +1209,28 @@ pub async fn import_assets_zip(
let mut rag_ingested_assets = 0usize;
let mut rag_chunks_ingested = 0usize;
let mut failed_entries: Vec<String> = Vec::new();
let mut pending_rag_items: Vec<PendingZipRagItem> = Vec::new();
// unit_number → (asset_id, public_url): populated from audio/video assets
let mut unit_audio_map: HashMap<i32, (Uuid, String)> = HashMap::new();
let rag_client = if ingest_rag {
Some(
reqwest::Client::builder()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.build()
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("HTTP client error: {}", e)))?,
)
let ollama_url = if use_dev_processing {
std::env::var("ZIP_DEV_OLLAMA_URL")
.ok()
.filter(|v| !v.trim().is_empty())
.or_else(|| std::env::var("DEV_OLLAMA_URL").ok().filter(|v| !v.trim().is_empty()))
.unwrap_or_else(ai::get_ollama_url)
} else {
ai::get_ollama_url()
};
let whisper_url_override = if use_dev_processing {
std::env::var("ZIP_DEV_WHISPER_URL")
.ok()
.filter(|v| !v.trim().is_empty())
.or_else(|| std::env::var("DEV_WHISPER_URL").ok().filter(|v| !v.trim().is_empty()))
} else {
None
};
let ollama_url = ai::get_ollama_url();
let model = ai::get_embedding_model();
if !ingest_rag {
@@ -1105,6 +1274,8 @@ pub async fn import_assets_zip(
sam_course_id_r2,
s3_settings_cl,
s3_client_cl,
use_dev_processing,
false,
)
.await
});
@@ -1125,6 +1296,8 @@ pub async fn import_assets_zip(
rag_ingested_assets: 0,
rag_chunks_ingested: 0,
failed_entries,
rag_background_started: false,
rag_background_items: 0,
}));
}
@@ -1150,36 +1323,64 @@ pub async fn import_assets_zip(
let asset_id = Uuid::new_v4();
let (storage_path, stored_filename, mimetype) = if is_flv {
let temp_storage_filename = format!("{}.flv", asset_id);
let temp_storage_path = format!("uploads/{}", temp_storage_filename);
tokio::fs::write(&temp_storage_path, &content)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let final_storage_filename = format!("{}.mp4", asset_id);
let final_storage_path = format!("uploads/{}", final_storage_filename);
if let Err((_, msg)) = transcode_flv_to_mp4(&temp_storage_path, &final_storage_path).await {
let _ = tokio::fs::remove_file(&temp_storage_path).await;
failed_entries.push(format!("{}: flv transcode failed ({})", entry_name, msg));
continue;
}
let _ = tokio::fs::remove_file(&temp_storage_path).await;
(final_storage_path, replace_extension(&safe_filename, "mp4"), "video/mp4".to_string())
} else {
let extension = StdPath::new(&safe_filename)
.extension()
.and_then(|s| s.to_str())
.unwrap_or("");
let storage_filename = if extension.is_empty() {
asset_id.to_string()
if use_dev_processing {
let storage_path = build_ready_for_rag_path(org_ctx.id, asset_id, &format!("{}.flv", asset_id));
tokio::fs::create_dir_all(StdPath::new(&storage_path).parent().unwrap_or(StdPath::new(".")))
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error creating ready-for-rag dir: {}", e)))?;
if let Err(e) = tokio::fs::write(&storage_path, &content).await {
failed_entries.push(format!("{}: local write failed ({})", entry_name, e));
continue;
}
(
storage_path,
safe_filename.clone(),
if guessed_mimetype.is_empty() { "video/x-flv".to_string() } else { guessed_mimetype.clone() },
)
} else {
format!("{}.{}", asset_id, extension)
};
let storage_path = format!("uploads/{}", storage_filename);
let temp_storage_filename = format!("{}.flv", asset_id);
let temp_storage_path = format!("uploads/{}", temp_storage_filename);
tokio::fs::write(&temp_storage_path, &content)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
(storage_path, safe_filename.clone(), guessed_mimetype)
let final_storage_filename = format!("{}.mp4", asset_id);
let final_storage_path = format!("uploads/{}", final_storage_filename);
if let Err((_, msg)) = transcode_flv_to_mp4(&temp_storage_path, &final_storage_path).await {
let _ = tokio::fs::remove_file(&temp_storage_path).await;
failed_entries.push(format!("{}: flv transcode failed ({})", entry_name, msg));
continue;
}
let _ = tokio::fs::remove_file(&temp_storage_path).await;
(final_storage_path, replace_extension(&safe_filename, "mp4"), "video/mp4".to_string())
}
} else {
if use_dev_processing {
let storage_path = build_ready_for_rag_path(org_ctx.id, asset_id, &safe_filename);
tokio::fs::create_dir_all(StdPath::new(&storage_path).parent().unwrap_or(StdPath::new(".")))
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error creating ready-for-rag dir: {}", e)))?;
if let Err(e) = tokio::fs::write(&storage_path, &content).await {
failed_entries.push(format!("{}: local write failed ({})", entry_name, e));
continue;
}
(storage_path, safe_filename.clone(), guessed_mimetype)
} else {
let extension = StdPath::new(&safe_filename)
.extension()
.and_then(|s| s.to_str())
.unwrap_or("");
let storage_filename = if extension.is_empty() {
asset_id.to_string()
} else {
format!("{}.{}", asset_id, extension)
};
let storage_path = format!("uploads/{}", storage_filename);
(storage_path, safe_filename.clone(), guessed_mimetype)
}
};
let storage_filename_for_s3 = StdPath::new(&storage_path)
@@ -1297,77 +1498,319 @@ pub async fn import_assets_zip(
created_at: chrono::Utc::now(),
};
// For text/PDF entries, look up the audio asset from the same unit
let (linked_audio_id, linked_audio_url) = if !is_audio_video {
match unit_number.and_then(|u| unit_audio_map.get(&u)) {
Some((aid, aurl)) => (Some(*aid), Some(aurl.clone())),
None => (None, None),
pending_rag_items.push(PendingZipRagItem {
entry_name,
asset,
is_audio_video,
unit_number,
});
}
}
let mut rag_background_started = false;
let mut rag_background_items = 0usize;
if ingest_rag && !pending_rag_items.is_empty() {
let pool_bg = pool.clone();
let org_id_bg = org_ctx.id;
let user_id_bg = claims.sub;
let ollama_url_bg = ollama_url.clone();
let whisper_url_bg = whisper_url_override.clone();
let model_bg = model.clone();
let unit_audio_map_bg = unit_audio_map.clone();
let queued_count = pending_rag_items.len();
let task_id = match create_zip_rag_background_task(
&pool,
org_ctx.id,
claims.sub,
course_id,
queued_count,
)
.await
{
Ok(id) => Some(id),
Err(e) => {
tracing::warn!("ZIP async RAG: no se pudo crear background task ({})", e);
None
}
};
let rag_concurrency = env::var("ZIP_RAG_CONCURRENCY")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.map(|v| v.clamp(1, 12))
.unwrap_or(5);
rag_background_started = true;
rag_background_items = queued_count;
tokio::spawn(async move {
if let Some(tid) = task_id {
let _ = set_zip_rag_task_status(&pool_bg, tid, "processing", 0, 0, 0, None).await;
}
let client = match reqwest::Client::builder()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.build()
{
Ok(c) => c,
Err(e) => {
tracing::error!("ZIP async RAG: no se pudo crear cliente HTTP: {}", e);
return;
}
} else {
(None, None)
};
match extract_asset_text(&asset).await {
Ok(extracted) => {
let trimmed = extracted.trim();
if trimmed.len() < 80 {
failed_entries.push(format!("{}: contenido insuficiente para RAG", entry_name));
continue;
}
let mut ingested_assets = 0usize;
let mut ingested_chunks = 0usize;
let mut processed_items = 0usize;
let mut failed_items = 0usize;
let chunks = chunk_text(trimmed, 900);
if chunks.is_empty() {
failed_entries.push(format!("{}: no se pudieron generar chunks", entry_name));
continue;
}
let mut pending_rag_items = pending_rag_items;
let mut unit_audio_map_bg = unit_audio_map_bg;
let source_kind = if is_audio_video {
for item in pending_rag_items.iter_mut() {
if !is_flv_media(&item.asset.filename, &item.asset.mimetype) {
continue;
}
match normalize_flv_asset_for_rag(&pool_bg, &mut item.asset).await {
Ok(()) => {
if item.is_audio_video {
if let Some(u) = item.unit_number {
unit_audio_map_bg.insert(
u,
(item.asset.id, build_public_url_from_storage_path(&item.asset.storage_path)),
);
}
}
}
Err((_, msg)) => {
tracing::warn!(
"ZIP async RAG: no se pudo normalizar FLV {} ({})",
item.entry_name,
msg
);
}
}
}
let unit_audio_map_bg = Arc::new(unit_audio_map_bg);
let mut join_set: JoinSet<(usize, usize, bool)> = JoinSet::new();
for item in pending_rag_items {
while join_set.len() >= rag_concurrency {
match join_set.join_next().await {
Some(Ok((assets_ok, chunks_ok, failed))) => {
ingested_assets += assets_ok;
ingested_chunks += chunks_ok;
processed_items += 1;
if failed {
failed_items += 1;
}
if let Some(tid) = task_id {
let progress = ((processed_items * 100) / queued_count.max(1)) as i32;
let _ = set_zip_rag_task_status(
&pool_bg,
tid,
"processing",
progress,
processed_items,
failed_items,
None,
)
.await;
}
}
Some(Err(e)) => {
processed_items += 1;
failed_items += 1;
tracing::warn!("ZIP async RAG: worker fallo ({})", e);
if let Some(tid) = task_id {
let progress = ((processed_items * 100) / queued_count.max(1)) as i32;
let _ = set_zip_rag_task_status(
&pool_bg,
tid,
"processing",
progress,
processed_items,
failed_items,
None,
)
.await;
}
}
None => break,
}
}
let pool_w = pool_bg.clone();
let client_w = client.clone();
let ollama_url_w = ollama_url_bg.clone();
let whisper_url_w = whisper_url_bg.clone();
let model_w = model_bg.clone();
let audio_map_w = unit_audio_map_bg.clone();
join_set.spawn(async move {
let source_kind = if item.is_audio_video {
"audio-transcription"
} else if mimetype.contains("pdf") {
} else if item.asset.mimetype.contains("pdf") {
"pdf"
} else {
"text"
};
let skill = if is_audio_video {
let skill = if item.is_audio_video {
Some("listening")
} else {
Some("reading")
};
if let Some(client) = &rag_client {
match ingest_chunks_to_question_bank(
&pool,
org_ctx.id,
claims.sub,
&asset,
source_kind,
skill,
&chunks,
client,
&ollama_url,
&model,
linked_audio_id,
linked_audio_url,
unit_number,
)
.await
{
Ok(()) => {
rag_ingested_assets += 1;
rag_chunks_ingested += chunks.len();
let (linked_audio_id, linked_audio_url) = if !item.is_audio_video {
match item.unit_number.and_then(|u| audio_map_w.get(&u)) {
Some((aid, aurl)) => (Some(*aid), Some(aurl.clone())),
None => (None, None),
}
} else {
(None, None)
};
match extract_asset_text_with_endpoints(&item.asset, whisper_url_w.as_deref()).await {
Ok(extracted) => {
let trimmed = extracted.trim();
if trimmed.len() < 80 {
tracing::warn!(
"ZIP async RAG: {} contenido insuficiente para RAG",
item.entry_name
);
return (0, 0, true);
}
Err((_, msg)) => {
failed_entries.push(format!("{}: rag ingest failed ({})", entry_name, msg));
let chunks = chunk_text(trimmed, 900);
if chunks.is_empty() {
tracing::warn!(
"ZIP async RAG: {} no genero chunks",
item.entry_name
);
return (0, 0, true);
}
match ingest_chunks_to_question_bank(
&pool_w,
org_id_bg,
user_id_bg,
&item.asset,
source_kind,
skill,
&chunks,
&client_w,
&ollama_url_w,
&model_w,
linked_audio_id,
linked_audio_url,
item.unit_number,
)
.await
{
Ok(()) => (1, chunks.len(), false),
Err((_, msg)) => {
tracing::warn!(
"ZIP async RAG: {} ingest fallo ({})",
item.entry_name,
msg
);
(0, 0, true)
}
}
}
Err((_, msg)) => {
tracing::warn!(
"ZIP async RAG: {} extract fallo ({})",
item.entry_name,
msg
);
(0, 0, true)
}
}
});
}
while let Some(result) = join_set.join_next().await {
match result {
Ok((assets_ok, chunks_ok, failed)) => {
ingested_assets += assets_ok;
ingested_chunks += chunks_ok;
processed_items += 1;
if failed {
failed_items += 1;
}
if let Some(tid) = task_id {
let progress = ((processed_items * 100) / queued_count.max(1)) as i32;
let _ = set_zip_rag_task_status(
&pool_bg,
tid,
"processing",
progress,
processed_items,
failed_items,
None,
)
.await;
}
}
Err(e) => {
processed_items += 1;
failed_items += 1;
tracing::warn!("ZIP async RAG: worker fallo ({})", e);
if let Some(tid) = task_id {
let progress = ((processed_items * 100) / queued_count.max(1)) as i32;
let _ = set_zip_rag_task_status(
&pool_bg,
tid,
"processing",
progress,
processed_items,
failed_items,
None,
)
.await;
}
}
}
Err((_, msg)) => {
failed_entries.push(format!("{}: extract failed ({})", entry_name, msg));
}
}
}
if let Some(tid) = task_id {
let final_status = if failed_items > 0 { "failed" } else { "completed" };
let final_message = if failed_items > 0 {
Some("Uno o más archivos fallaron durante la extracción o la ingesta RAG")
} else {
None
};
let _ = set_zip_rag_task_status(
&pool_bg,
tid,
final_status,
100,
queued_count,
failed_items,
final_message,
)
.await;
}
tracing::info!(
"ZIP async RAG finalizado: {} assets, {} chunks (concurrency={})",
ingested_assets,
ingested_chunks,
rag_concurrency
);
});
failed_entries.push(format!(
"Ingestion RAG iniciada en segundo plano para {} archivos. Puedes continuar usando el sistema mientras finaliza.",
queued_count
));
rag_ingested_assets = 0;
rag_chunks_ingested = 0;
}
let _ = tokio::fs::remove_file(&zip_path).await;
@@ -1377,6 +1820,8 @@ pub async fn import_assets_zip(
rag_ingested_assets,
rag_chunks_ingested,
failed_entries,
rag_background_started,
rag_background_items,
}))
}
@@ -1397,6 +1842,123 @@ fn replace_extension(filename: &str, new_ext: &str) -> String {
format!("{}.{}", base, new_ext)
}
fn replace_last_path_extension(path: &str, new_ext: &str) -> String {
if let Some((prefix, last)) = path.rsplit_once('/') {
return format!("{}/{}", prefix, replace_extension(last, new_ext));
}
replace_extension(path, new_ext)
}
fn build_public_url_from_storage_path(storage_path: &str) -> String {
if let Some((_, key)) = parse_s3_storage_path(storage_path) {
if let Some(settings) = get_s3_settings() {
return build_s3_public_url(&settings, key);
}
return storage_path.to_string();
}
let storage_filename = StdPath::new(storage_path)
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("");
format!("/assets/{}", storage_filename)
}
async fn normalize_flv_asset_for_rag(
pool: &PgPool,
asset: &mut Asset,
) -> Result<(), (StatusCode, String)> {
if !is_flv_media(&asset.filename, &asset.mimetype) {
return Ok(());
}
tokio::fs::create_dir_all("uploads/tmp")
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error creating temp dir: {}", e)))?;
let input_path = format!("uploads/tmp/flv-normalize-in-{}.flv", asset.id);
let output_path = format!("uploads/tmp/flv-normalize-out-{}.mp4", asset.id);
let source_bytes = read_storage_bytes(&asset.storage_path).await?;
tokio::fs::write(&input_path, source_bytes)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error writing temp FLV: {}", e)))?;
if let Err(e) = transcode_flv_to_mp4(&input_path, &output_path).await {
let _ = tokio::fs::remove_file(&input_path).await;
let _ = tokio::fs::remove_file(&output_path).await;
return Err(e);
}
let _ = tokio::fs::remove_file(&input_path).await;
let output_bytes = tokio::fs::read(&output_path)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error reading temp MP4: {}", e)))?;
let _ = tokio::fs::remove_file(&output_path).await;
let next_storage_path = replace_last_path_extension(&asset.storage_path, "mp4");
if let Some((bucket, key)) = parse_s3_storage_path(&next_storage_path) {
let settings = get_s3_settings().ok_or((
StatusCode::INTERNAL_SERVER_ERROR,
"S3 path detected but storage is not configured".to_string(),
))?;
let client = build_s3_client(&settings).await?;
let old_storage_path = asset.storage_path.clone();
client
.put_object()
.bucket(bucket)
.key(key)
.content_type("video/mp4")
.body(output_bytes.clone().into())
.send()
.await
.map_err(|e| (StatusCode::BAD_GATEWAY, format!("Error uploading normalized MP4 to S3: {}", e)))?;
if old_storage_path != next_storage_path {
let _ = delete_storage_path(&old_storage_path).await;
}
} else {
tokio::fs::write(&next_storage_path, &output_bytes)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error writing normalized MP4: {}", e)))?;
if asset.storage_path != next_storage_path {
let _ = tokio::fs::remove_file(&asset.storage_path).await;
}
}
let next_filename = replace_extension(&asset.filename, "mp4");
let next_size = output_bytes.len() as i64;
sqlx::query(
r#"
UPDATE assets
SET filename = $1,
storage_path = $2,
mimetype = $3,
size_bytes = $4
WHERE id = $5
"#,
)
.bind(&next_filename)
.bind(&next_storage_path)
.bind("video/mp4")
.bind(next_size)
.bind(asset.id)
.execute(pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error updating normalized asset: {}", e)))?;
asset.filename = next_filename;
asset.storage_path = next_storage_path;
asset.mimetype = "video/mp4".to_string();
asset.size_bytes = next_size;
Ok(())
}
async fn transcode_flv_to_mp4(input_path: &str, output_path: &str) -> Result<(), (StatusCode, String)> {
let output = Command::new("ffmpeg")
.arg("-y")
@@ -1514,12 +2076,19 @@ async fn ingest_chunks_to_question_bank(
}
async fn extract_asset_text(asset: &Asset) -> Result<String, (StatusCode, String)> {
extract_asset_text_with_endpoints(asset, None).await
}
async fn extract_asset_text_with_endpoints(
asset: &Asset,
whisper_url_override: Option<&str>,
) -> Result<String, (StatusCode, String)> {
let lower_name = asset.filename.to_lowercase();
let mimetype = asset.mimetype.to_lowercase();
if mimetype.starts_with("audio/") || mimetype.starts_with("video/") {
let bytes = read_storage_bytes(&asset.storage_path).await?;
return transcribe_media_bytes(bytes, &asset.filename).await;
return transcribe_media_bytes_with_override(bytes, &asset.filename, whisper_url_override).await;
}
if mimetype.contains("pdf") || lower_name.ends_with(".pdf") {
@@ -1583,8 +2152,18 @@ async fn extract_pdf_text_from_bytes(bytes: Vec<u8>) -> Result<String, (StatusCo
Ok(text)
}
async fn transcribe_media_bytes(file_data: Vec<u8>, filename: &str) -> Result<String, (StatusCode, String)> {
async fn transcribe_media_bytes_with_override(
file_data: Vec<u8>,
filename: &str,
whisper_url_override: Option<&str>,
) -> Result<String, (StatusCode, String)> {
let mut whisper_urls: Vec<String> = Vec::new();
if let Some(url) = whisper_url_override {
let trimmed = url.trim();
if !trimmed.is_empty() {
whisper_urls.push(trimmed.trim_end_matches('/').to_string());
}
}
if let Ok(url) = std::env::var("WHISPER_URL") {
let trimmed = url.trim();
if !trimmed.is_empty() {
+22 -3
View File
@@ -6,13 +6,14 @@ import { Upload, Database, FileArchive, CheckCircle2, AlertTriangle, Scissors }
export default function AdminSharedMaterialsPage() {
const [zipFile, setZipFile] = useState<File | null>(null);
const [ingestRag, setIngestRag] = useState(true);
const [ingestRag, setIngestRag] = useState(false);
const [englishLevel, setEnglishLevel] = useState('');
const [plans, setPlans] = useState<MySqlPlan[]>([]);
const [courses, setCourses] = useState<MySqlCourse[]>([]);
const [selectedPlanId, setSelectedPlanId] = useState<number | ''>('');
const [selectedCourseId, setSelectedCourseId] = useState<number | ''>('');
const [splitToRegular, setSplitToRegular] = useState(false);
const [useDevProcessing, setUseDevProcessing] = useState(false);
const [regularPlanId, setRegularPlanId] = useState<number | ''>('');
const [regularCourses, setRegularCourses] = useState<MySqlCourse[]>([]);
const [selectedCourseIdR1, setSelectedCourseIdR1] = useState<number | ''>('');
@@ -27,6 +28,8 @@ export default function AdminSharedMaterialsPage() {
rag_ingested_assets: number;
rag_chunks_ingested: number;
failed_entries: string[];
rag_background_started?: boolean;
rag_background_items?: number;
} | null>(null);
const canUpload = useMemo(() => Boolean(zipFile) && !loading, [zipFile, loading]);
@@ -159,6 +162,7 @@ export default function AdminSharedMaterialsPage() {
splitToRegular,
selectedCourseIdR1 || undefined,
selectedCourseIdR2 || undefined,
useDevProcessing,
);
setResult(response);
setPhase('done');
@@ -218,7 +222,16 @@ export default function AdminSharedMaterialsPage() {
checked={ingestRag}
onChange={(e) => setIngestRag(e.target.checked)}
/>
<span className="font-medium">Ingerir automaticamente en RAG al importar</span>
<span className="font-medium">Ingerir automaticamente en RAG al importar (recomendado activar solo para ZIPs pequeños)</span>
</label>
<label className="flex items-center gap-3 rounded-lg border border-sky-200 bg-sky-50 px-4 py-3 text-sm text-sky-900">
<input
type="checkbox"
checked={useDevProcessing}
onChange={(e) => setUseDevProcessing(e.target.checked)}
/>
<span className="font-medium">Procesar este ZIP con infraestructura DEV (más potente) para transcripción/IA</span>
</label>
<div className="space-y-2">
@@ -396,7 +409,7 @@ export default function AdminSharedMaterialsPage() {
<p className="text-sm text-slate-700">{statusText}</p>
<p className="text-xs text-slate-500">
Nota: esta importacion ZIP corre en la misma solicitud (no crea fila en Tasks), por eso aqui ves el estado en vivo.
Nota: la subida e importación base terminan en esta solicitud. Si activas RAG, su procesamiento puede continuar en segundo plano.
</p>
</div>
)}
@@ -405,6 +418,12 @@ export default function AdminSharedMaterialsPage() {
{result && (
<div className="rounded-2xl border border-slate-200 dark:border-white/10 bg-white dark:bg-white/[0.02] p-6 space-y-4">
<h3 className="text-lg font-bold text-slate-900 dark:text-white">Resultado de la Importacion</h3>
{result.rag_background_started && (
<div className="inline-flex items-center gap-2 rounded-full border border-indigo-200 bg-indigo-50 px-3 py-1 text-xs font-semibold text-indigo-800">
<span className="h-2 w-2 animate-pulse rounded-full bg-indigo-500" />
RAG en segundo plano: {result.rag_background_items ?? 0} archivos en procesamiento
</div>
)}
<div className="grid grid-cols-1 md:grid-cols-3 gap-4">
<div className="rounded-lg border border-slate-200 dark:border-white/10 p-4">
<div className="flex items-center gap-2 text-slate-700 dark:text-gray-300">
+15 -9
View File
@@ -98,6 +98,10 @@ export default function BackgroundTasksPage() {
label = 'Transcription';
color = 'bg-purple-100 text-purple-800';
break;
case 'zip_rag_import':
label = 'ZIP RAG';
color = 'bg-indigo-100 text-indigo-800';
break;
case 'lesson_image':
label = 'Lesson Image';
color = 'bg-blue-100 text-blue-800';
@@ -165,7 +169,7 @@ export default function BackgroundTasksPage() {
<div className="text-xs text-gray-400">({format(new Date(task.updated_at), 'yyyy')})</div>
</td>
<td className="px-6 py-4 text-right space-x-2">
{task.status === 'failed' && (
{task.task_type === 'lesson_transcription' && task.status === 'failed' && (
<button
onClick={() => handleRetry(task.id)}
disabled={actionLoading === task.id}
@@ -175,14 +179,16 @@ export default function BackgroundTasksPage() {
Retry
</button>
)}
<button
onClick={() => handleCancel(task.id)}
disabled={actionLoading === task.id}
className="inline-flex items-center px-3 py-1.5 border border-red-200 text-xs font-medium rounded-md text-red-700 bg-red-50 hover:bg-red-100 disabled:opacity-50"
>
{actionLoading === task.id ? <Loader2 className="w-3 h-3 animate-spin mr-1" /> : <XCircle className="w-3 h-3 mr-1" />}
Cancel
</button>
{task.task_type === 'lesson_transcription' && (
<button
onClick={() => handleCancel(task.id)}
disabled={actionLoading === task.id}
className="inline-flex items-center px-3 py-1.5 border border-red-200 text-xs font-medium rounded-md text-red-700 bg-red-50 hover:bg-red-100 disabled:opacity-50"
>
{actionLoading === task.id ? <Loader2 className="w-3 h-3 animate-spin mr-1" /> : <XCircle className="w-3 h-3 mr-1" />}
Cancel
</button>
)}
</td>
</tr>
))}
+7 -1
View File
@@ -755,6 +755,8 @@ export interface AssetZipImportResult {
rag_ingested_assets: number;
rag_chunks_ingested: number;
failed_entries: string[];
rag_background_started?: boolean;
rag_background_items?: number;
}
export interface Cohort {
@@ -1130,6 +1132,7 @@ export const cmsApi = {
splitToRegular = false,
samCourseIdR1?: number,
samCourseIdR2?: number,
useDevProcessing = false,
): Promise<AssetZipImportResult> => {
return new Promise((resolve, reject) => {
const maxNetworkRetries = 2;
@@ -1147,6 +1150,9 @@ export const cmsApi = {
if (samCourseIdR1) formData.append('sam_course_id_r1', String(samCourseIdR1));
if (samCourseIdR2) formData.append('sam_course_id_r2', String(samCourseIdR2));
}
if (useDevProcessing) {
formData.append('use_dev_processing', 'true');
}
const xhr = new XMLHttpRequest();
xhr.open('POST', `${API_BASE_URL}/api/assets/import-zip`);
@@ -1819,7 +1825,7 @@ export interface BackgroundTask {
id: string;
title: string;
course_title?: string;
task_type: 'lesson_transcription' | 'lesson_image' | 'course_image';
task_type: 'lesson_transcription' | 'lesson_image' | 'course_image' | 'zip_rag_import';
status: 'idle' | 'queued' | 'processing' | 'failed' | 'completed' | 'error';
progress: number;
updated_at: string;