Refactor code structure for improved readability and maintainability
This commit is contained in:
@@ -1,4 +1,5 @@
|
|||||||
use crate::handlers::run_transcription_task;
|
use crate::handlers::run_transcription_task;
|
||||||
|
use crate::handlers_assets::{ingest_asset_for_rag_core, set_zip_rag_task_status};
|
||||||
use axum::{
|
use axum::{
|
||||||
Json,
|
Json,
|
||||||
extract::{Path, State},
|
extract::{Path, State},
|
||||||
@@ -6,6 +7,7 @@ use axum::{
|
|||||||
};
|
};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use sqlx::{FromRow, PgPool};
|
use sqlx::{FromRow, PgPool};
|
||||||
|
use tokio::time::Duration;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[derive(Debug, Serialize, FromRow)]
|
#[derive(Debug, Serialize, FromRow)]
|
||||||
@@ -74,11 +76,24 @@ struct LessonStatusRow {
|
|||||||
transcription_status: Option<String>,
|
transcription_status: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
struct ZipRetryTaskRow {
|
||||||
|
id: Uuid,
|
||||||
|
organization_id: Uuid,
|
||||||
|
created_by: Uuid,
|
||||||
|
metadata: serde_json::Value,
|
||||||
|
created_at: chrono::DateTime<chrono::Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
struct AssetIdRow {
|
||||||
|
id: Uuid,
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn retry_task(
|
pub async fn retry_task(
|
||||||
State(pool): State<PgPool>,
|
State(pool): State<PgPool>,
|
||||||
Path(id): Path<Uuid>,
|
Path(id): Path<Uuid>,
|
||||||
) -> Result<StatusCode, (StatusCode, String)> {
|
) -> Result<StatusCode, (StatusCode, String)> {
|
||||||
|
|
||||||
// Check lessons for transcription failures
|
// Check lessons for transcription failures
|
||||||
let lesson = sqlx::query_as::<_, LessonStatusRow>("SELECT transcription_status FROM lessons WHERE id = $1")
|
let lesson = sqlx::query_as::<_, LessonStatusRow>("SELECT transcription_status FROM lessons WHERE id = $1")
|
||||||
.bind(id)
|
.bind(id)
|
||||||
@@ -97,6 +112,176 @@ pub async fn retry_task(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let zip_task = sqlx::query_as::<_, ZipRetryTaskRow>(
|
||||||
|
r#"
|
||||||
|
SELECT id, organization_id, created_by, metadata, created_at
|
||||||
|
FROM background_tasks
|
||||||
|
WHERE id = $1
|
||||||
|
AND task_type = 'zip_rag_import'
|
||||||
|
AND status = 'failed'
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(id)
|
||||||
|
.fetch_optional(&pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||||
|
|
||||||
|
if let Some(task) = zip_task {
|
||||||
|
let zip_batch_id_from_metadata = task
|
||||||
|
.metadata
|
||||||
|
.get("zip_batch_id")
|
||||||
|
.and_then(|v| v.as_str())
|
||||||
|
.map(|raw| {
|
||||||
|
Uuid::parse_str(raw).map_err(|_| {
|
||||||
|
(
|
||||||
|
StatusCode::BAD_REQUEST,
|
||||||
|
"zip_batch_id inválido en metadata de la tarea".to_string(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.transpose()?;
|
||||||
|
|
||||||
|
let zip_batch_id = if let Some(id) = zip_batch_id_from_metadata {
|
||||||
|
id
|
||||||
|
} else {
|
||||||
|
#[derive(sqlx::FromRow)]
|
||||||
|
struct ZipBatchIdRow {
|
||||||
|
zip_batch_id: Uuid,
|
||||||
|
}
|
||||||
|
|
||||||
|
let candidates = sqlx::query_as::<_, ZipBatchIdRow>(
|
||||||
|
r#"
|
||||||
|
SELECT zip_batch_id
|
||||||
|
FROM assets
|
||||||
|
WHERE organization_id = $1
|
||||||
|
AND uploaded_by = $2
|
||||||
|
AND zip_batch_id IS NOT NULL
|
||||||
|
AND created_at BETWEEN ($3 - INTERVAL '30 minutes') AND ($3 + INTERVAL '30 minutes')
|
||||||
|
GROUP BY zip_batch_id
|
||||||
|
ORDER BY COUNT(*) DESC, MAX(created_at) DESC
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(task.organization_id)
|
||||||
|
.bind(task.created_by)
|
||||||
|
.bind(task.created_at)
|
||||||
|
.fetch_all(&pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||||
|
|
||||||
|
if candidates.len() == 1 {
|
||||||
|
candidates[0].zip_batch_id
|
||||||
|
} else if candidates.is_empty() {
|
||||||
|
return Err((
|
||||||
|
StatusCode::BAD_REQUEST,
|
||||||
|
"La tarea no tiene zip_batch_id y no se pudo inferir un lote asociado. Vuelve a cargar el ZIP para reingestar.".to_string(),
|
||||||
|
));
|
||||||
|
} else {
|
||||||
|
return Err((
|
||||||
|
StatusCode::BAD_REQUEST,
|
||||||
|
"La tarea no tiene zip_batch_id y hay múltiples lotes candidatos; vuelve a cargar el ZIP para evitar ambigüedad.".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let assets = sqlx::query_as::<_, AssetIdRow>(
|
||||||
|
"SELECT id FROM assets WHERE organization_id = $1 AND zip_batch_id = $2 ORDER BY created_at ASC",
|
||||||
|
)
|
||||||
|
.bind(task.organization_id)
|
||||||
|
.bind(zip_batch_id)
|
||||||
|
.fetch_all(&pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||||
|
|
||||||
|
if assets.is_empty() {
|
||||||
|
return Err((
|
||||||
|
StatusCode::BAD_REQUEST,
|
||||||
|
"No se encontraron assets para ese zip_batch_id".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let pool_clone = pool.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let total = assets.len();
|
||||||
|
let mut processed = 0usize;
|
||||||
|
let mut failed = 0usize;
|
||||||
|
|
||||||
|
let _ = set_zip_rag_task_status(&pool_clone, task.id, "processing", 0, 0, 0, None).await;
|
||||||
|
|
||||||
|
for row in assets {
|
||||||
|
let ingest_result = tokio::time::timeout(
|
||||||
|
Duration::from_secs(120),
|
||||||
|
ingest_asset_for_rag_core(&pool_clone, task.organization_id, task.created_by, row.id),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match ingest_result {
|
||||||
|
Ok(Ok(_)) => {
|
||||||
|
processed += 1;
|
||||||
|
}
|
||||||
|
Ok(Err(msg)) => {
|
||||||
|
failed += 1;
|
||||||
|
tracing::warn!(
|
||||||
|
"Retry ZIP RAG: fallo ingesta asset {} en task {}: {}",
|
||||||
|
row.id,
|
||||||
|
task.id,
|
||||||
|
msg
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(_elapsed) => {
|
||||||
|
failed += 1;
|
||||||
|
tracing::warn!(
|
||||||
|
"Retry ZIP RAG: timeout (120s) en asset {} en task {}",
|
||||||
|
row.id,
|
||||||
|
task.id
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let progress = (((processed + failed) as f32 / total as f32) * 100.0)
|
||||||
|
.round()
|
||||||
|
.clamp(0.0, 100.0) as i32;
|
||||||
|
|
||||||
|
let _ = set_zip_rag_task_status(
|
||||||
|
&pool_clone,
|
||||||
|
task.id,
|
||||||
|
"processing",
|
||||||
|
progress,
|
||||||
|
processed,
|
||||||
|
failed,
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let final_status = if failed > 0 { "failed" } else { "completed" };
|
||||||
|
let final_msg = if failed > 0 {
|
||||||
|
Some("Reintento RAG completado con errores parciales")
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
if let Err(e) = set_zip_rag_task_status(
|
||||||
|
&pool_clone,
|
||||||
|
task.id,
|
||||||
|
final_status,
|
||||||
|
100,
|
||||||
|
processed,
|
||||||
|
failed,
|
||||||
|
final_msg,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!(
|
||||||
|
"Retry ZIP RAG: no se pudo actualizar estado final de task {} a '{}': {:?}",
|
||||||
|
task.id,
|
||||||
|
final_status,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return Ok(StatusCode::ACCEPTED);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(StatusCode::NOT_FOUND)
|
Ok(StatusCode::NOT_FOUND)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -680,39 +680,33 @@ pub async fn delete_asset(
|
|||||||
Ok(StatusCode::NO_CONTENT)
|
Ok(StatusCode::NO_CONTENT)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// POST /api/assets/:id/ingest-rag - Ingesta un asset (PDF/audio/video/texto) en chunks para RAG
|
/// Lógica interna de ingesta RAG reutilizable (también usada en retry de tareas).
|
||||||
pub async fn ingest_asset_for_rag(
|
pub async fn ingest_asset_for_rag_core(
|
||||||
Org(org_ctx): Org,
|
pool: &PgPool,
|
||||||
claims: Claims,
|
org_id: Uuid,
|
||||||
State(pool): State<PgPool>,
|
user_id: Uuid,
|
||||||
Path(id): Path<Uuid>,
|
asset_id: Uuid,
|
||||||
) -> Result<Json<AssetRagIngestResponse>, (StatusCode, String)> {
|
) -> Result<(usize, usize), String> {
|
||||||
let asset: Asset = sqlx::query_as(
|
let asset: Asset = sqlx::query_as("SELECT * FROM assets WHERE id = $1 AND organization_id = $2")
|
||||||
"SELECT * FROM assets WHERE id = $1 AND organization_id = $2"
|
.bind(asset_id)
|
||||||
)
|
.bind(org_id)
|
||||||
.bind(id)
|
.fetch_optional(pool)
|
||||||
.bind(org_ctx.id)
|
.await
|
||||||
.fetch_optional(&pool)
|
.map_err(|e| e.to_string())?
|
||||||
.await
|
.ok_or_else(|| "Activo no encontrado".to_string())?;
|
||||||
.map_err(|e: sqlx::Error| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
|
|
||||||
.ok_or((StatusCode::NOT_FOUND, "Activo no encontrado".to_string()))?;
|
|
||||||
|
|
||||||
let extracted = extract_asset_text(&asset).await?;
|
let extracted = extract_asset_text(&asset)
|
||||||
let content = extracted.trim();
|
.await
|
||||||
|
.map_err(|(_, msg)| msg)?;
|
||||||
|
let content = extracted.trim().to_string();
|
||||||
|
|
||||||
if content.len() < 80 {
|
if content.len() < 80 {
|
||||||
return Err((
|
return Err("No se encontró suficiente texto utilizable en el archivo".to_string());
|
||||||
StatusCode::BAD_REQUEST,
|
|
||||||
"No se encontró suficiente texto utilizable en el archivo".to_string(),
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let chunks = chunk_text(content, 900);
|
let chunks = chunk_text(&content, 900);
|
||||||
if chunks.is_empty() {
|
if chunks.is_empty() {
|
||||||
return Err((
|
return Err("No se pudo generar contenido para RAG".to_string());
|
||||||
StatusCode::BAD_REQUEST,
|
|
||||||
"No se pudo generar contenido para RAG".to_string(),
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sqlx::query(
|
sqlx::query(
|
||||||
@@ -723,11 +717,11 @@ pub async fn ingest_asset_for_rag(
|
|||||||
AND source_metadata->>'asset_id' = $2
|
AND source_metadata->>'asset_id' = $2
|
||||||
"#,
|
"#,
|
||||||
)
|
)
|
||||||
.bind(org_ctx.id)
|
.bind(org_id)
|
||||||
.bind(asset.id.to_string())
|
.bind(asset.id.to_string())
|
||||||
.execute(&pool)
|
.execute(pool)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Cleanup failed: {}", e)))?;
|
.map_err(|e| format!("Cleanup failed: {}", e))?;
|
||||||
|
|
||||||
let source_kind = if asset.mimetype.starts_with("audio/") || asset.mimetype.starts_with("video/") {
|
let source_kind = if asset.mimetype.starts_with("audio/") || asset.mimetype.starts_with("video/") {
|
||||||
"audio-transcription"
|
"audio-transcription"
|
||||||
@@ -747,16 +741,16 @@ pub async fn ingest_asset_for_rag(
|
|||||||
.danger_accept_invalid_certs(true)
|
.danger_accept_invalid_certs(true)
|
||||||
.danger_accept_invalid_hostnames(true)
|
.danger_accept_invalid_hostnames(true)
|
||||||
.build()
|
.build()
|
||||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("HTTP client error: {}", e)))?;
|
.map_err(|e| format!("HTTP client error: {}", e))?;
|
||||||
let ollama_url = ai::get_ollama_url();
|
let ollama_url = ai::get_ollama_url();
|
||||||
let model = ai::get_embedding_model();
|
let model = ai::get_embedding_model();
|
||||||
|
|
||||||
ingest_chunks_to_question_bank(
|
ingest_chunks_to_question_bank(
|
||||||
&pool,
|
pool,
|
||||||
org_ctx.id,
|
org_id,
|
||||||
claims.sub,
|
user_id,
|
||||||
&asset,
|
&asset,
|
||||||
&source_kind,
|
source_kind,
|
||||||
skill,
|
skill,
|
||||||
&chunks,
|
&chunks,
|
||||||
&client,
|
&client,
|
||||||
@@ -766,13 +760,51 @@ pub async fn ingest_asset_for_rag(
|
|||||||
None,
|
None,
|
||||||
asset.unit_number,
|
asset.unit_number,
|
||||||
)
|
)
|
||||||
.await?;
|
.await
|
||||||
|
.map_err(|(_, msg)| msg)?;
|
||||||
|
|
||||||
|
Ok((chunks.len(), content.len()))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// POST /api/assets/:id/ingest-rag - Ingesta un asset (PDF/audio/video/texto) en chunks para RAG
|
||||||
|
pub async fn ingest_asset_for_rag(
|
||||||
|
Org(org_ctx): Org,
|
||||||
|
claims: Claims,
|
||||||
|
State(pool): State<PgPool>,
|
||||||
|
Path(id): Path<Uuid>,
|
||||||
|
) -> Result<Json<AssetRagIngestResponse>, (StatusCode, String)> {
|
||||||
|
let (chunks_ingested, chars_ingested) = ingest_asset_for_rag_core(&pool, org_ctx.id, claims.sub, id)
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
if e.contains("Activo no encontrado") {
|
||||||
|
(StatusCode::NOT_FOUND, e)
|
||||||
|
} else if e.contains("suficiente texto") || e.contains("generar contenido") {
|
||||||
|
(StatusCode::BAD_REQUEST, e)
|
||||||
|
} else {
|
||||||
|
(StatusCode::INTERNAL_SERVER_ERROR, e)
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let asset: Asset = sqlx::query_as("SELECT * FROM assets WHERE id = $1 AND organization_id = $2")
|
||||||
|
.bind(id)
|
||||||
|
.bind(org_ctx.id)
|
||||||
|
.fetch_one(&pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e: sqlx::Error| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||||
|
|
||||||
|
let source_kind = if asset.mimetype.starts_with("audio/") || asset.mimetype.starts_with("video/") {
|
||||||
|
"audio-transcription"
|
||||||
|
} else if asset.mimetype.contains("pdf") {
|
||||||
|
"pdf"
|
||||||
|
} else {
|
||||||
|
"text"
|
||||||
|
};
|
||||||
|
|
||||||
Ok(Json(AssetRagIngestResponse {
|
Ok(Json(AssetRagIngestResponse {
|
||||||
asset_id: asset.id,
|
asset_id: asset.id,
|
||||||
source: source_kind.to_string(),
|
source: source_kind.to_string(),
|
||||||
chunks_ingested: chunks.len(),
|
chunks_ingested,
|
||||||
chars_ingested: content.len(),
|
chars_ingested,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -825,6 +857,7 @@ async fn create_zip_rag_background_task(
|
|||||||
org_id: Uuid,
|
org_id: Uuid,
|
||||||
user_id: Uuid,
|
user_id: Uuid,
|
||||||
course_id: Option<Uuid>,
|
course_id: Option<Uuid>,
|
||||||
|
zip_batch_id: Uuid,
|
||||||
total_items: usize,
|
total_items: usize,
|
||||||
) -> Result<Uuid, sqlx::Error> {
|
) -> Result<Uuid, sqlx::Error> {
|
||||||
let task_id = Uuid::new_v4();
|
let task_id = Uuid::new_v4();
|
||||||
@@ -865,7 +898,7 @@ async fn create_zip_rag_background_task(
|
|||||||
$5,
|
$5,
|
||||||
0,
|
0,
|
||||||
0,
|
0,
|
||||||
'{}'::jsonb,
|
jsonb_build_object('zip_batch_id', $6::text),
|
||||||
NOW(),
|
NOW(),
|
||||||
NOW()
|
NOW()
|
||||||
)
|
)
|
||||||
@@ -876,13 +909,14 @@ async fn create_zip_rag_background_task(
|
|||||||
.bind(user_id)
|
.bind(user_id)
|
||||||
.bind(course_title)
|
.bind(course_title)
|
||||||
.bind(total_items as i32)
|
.bind(total_items as i32)
|
||||||
|
.bind(zip_batch_id.to_string())
|
||||||
.execute(pool)
|
.execute(pool)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(task_id)
|
Ok(task_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn set_zip_rag_task_status(
|
pub async fn set_zip_rag_task_status(
|
||||||
pool: &PgPool,
|
pool: &PgPool,
|
||||||
task_id: Uuid,
|
task_id: Uuid,
|
||||||
status: &str,
|
status: &str,
|
||||||
@@ -1739,6 +1773,7 @@ pub async fn import_assets_zip(
|
|||||||
org_ctx.id,
|
org_ctx.id,
|
||||||
claims.sub,
|
claims.sub,
|
||||||
course_id,
|
course_id,
|
||||||
|
zip_batch_id,
|
||||||
queued_count,
|
queued_count,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -169,7 +169,7 @@ export default function BackgroundTasksPage() {
|
|||||||
<div className="text-xs text-gray-400">({format(new Date(task.updated_at), 'yyyy')})</div>
|
<div className="text-xs text-gray-400">({format(new Date(task.updated_at), 'yyyy')})</div>
|
||||||
</td>
|
</td>
|
||||||
<td className="px-6 py-4 text-right space-x-2">
|
<td className="px-6 py-4 text-right space-x-2">
|
||||||
{task.task_type === 'lesson_transcription' && task.status === 'failed' && (
|
{(task.task_type === 'lesson_transcription' || task.task_type === 'zip_rag_import') && task.status === 'failed' && (
|
||||||
<button
|
<button
|
||||||
onClick={() => handleRetry(task.id)}
|
onClick={() => handleRetry(task.id)}
|
||||||
disabled={actionLoading === task.id}
|
disabled={actionLoading === task.id}
|
||||||
|
|||||||
@@ -926,7 +926,7 @@ export const apiFetch = (url: string, options: ApiFetchOptions = {}, isLms: bool
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Handle no-content responses
|
// Handle no-content responses
|
||||||
if (res.status === 204) return;
|
if (res.status === 204 || res.status === 202) return;
|
||||||
return res.json();
|
return res.json();
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|||||||
File diff suppressed because one or more lines are too long
Reference in New Issue
Block a user