feat: fix importation from remote DB
This commit is contained in:
@@ -48,10 +48,15 @@ pub async fn save_mysql_courses_and_plans(
|
|||||||
plans: Vec<MySqlPlanInfo>,
|
plans: Vec<MySqlPlanInfo>,
|
||||||
courses: Vec<MySqlCourseInfo>,
|
courses: Vec<MySqlCourseInfo>,
|
||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
|
let plans_count = plans.len();
|
||||||
|
let courses_count = courses.len();
|
||||||
|
tracing::info!("Saving {} study plans and {} courses from MySQL", plans_count, courses_count);
|
||||||
|
|
||||||
// Save study plans first
|
// Save study plans first
|
||||||
for plan in plans {
|
for plan in plans {
|
||||||
let course_type = calculate_course_type(&plan.nombre_plan);
|
let course_type = calculate_course_type(&plan.nombre_plan);
|
||||||
|
tracing::debug!("Saving study plan: {} (ID: {})", plan.nombre_plan, plan.id_plan_de_estudios);
|
||||||
|
|
||||||
sqlx::query(
|
sqlx::query(
|
||||||
r#"
|
r#"
|
||||||
INSERT INTO mysql_study_plans (mysql_id, organization_id, name, course_type)
|
INSERT INTO mysql_study_plans (mysql_id, organization_id, name, course_type)
|
||||||
@@ -70,13 +75,14 @@ pub async fn save_mysql_courses_and_plans(
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| format!("Failed to save study plan: {}", e))?;
|
.map_err(|e| format!("Failed to save study plan: {}", e))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save courses
|
// Save courses
|
||||||
for course in courses {
|
for course in courses {
|
||||||
// Determine course_type from duration (40h = regular, 80h = intensive)
|
// Determine course_type from duration (40h = regular, 80h = intensive)
|
||||||
let course_type = calculate_course_type_from_duration(course.duracion);
|
let course_type = calculate_course_type_from_duration(course.duracion);
|
||||||
let level_calculated = calculate_course_level(course.nivel_curso);
|
let level_calculated = calculate_course_level(course.nivel_curso);
|
||||||
|
tracing::debug!("Saving course: {} (ID: {}, Plan ID: {})", course.nombre_curso, course.id_cursos, course.id_plan_de_estudios);
|
||||||
|
|
||||||
// Get study_plan_id from mysql_study_plans
|
// Get study_plan_id from mysql_study_plans
|
||||||
let study_plan_id: i32 = sqlx::query_scalar(
|
let study_plan_id: i32 = sqlx::query_scalar(
|
||||||
"SELECT id FROM mysql_study_plans WHERE mysql_id = $1 AND organization_id = $2"
|
"SELECT id FROM mysql_study_plans WHERE mysql_id = $1 AND organization_id = $2"
|
||||||
@@ -86,7 +92,7 @@ pub async fn save_mysql_courses_and_plans(
|
|||||||
.fetch_one(pool)
|
.fetch_one(pool)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| format!("Failed to find study plan: {}", e))?;
|
.map_err(|e| format!("Failed to find study plan: {}", e))?;
|
||||||
|
|
||||||
sqlx::query(
|
sqlx::query(
|
||||||
r#"
|
r#"
|
||||||
INSERT INTO mysql_courses (
|
INSERT INTO mysql_courses (
|
||||||
@@ -115,7 +121,8 @@ pub async fn save_mysql_courses_and_plans(
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| format!("Failed to save course: {}", e))?;
|
.map_err(|e| format!("Failed to save course: {}", e))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tracing::info!("Successfully saved {} study plans and {} courses", plans_count, courses_count);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -391,6 +398,8 @@ pub async fn import_from_mysql(
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch plans: {}", e)))?;
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch plans: {}", e)))?;
|
||||||
|
|
||||||
|
tracing::info!("Fetched {} study plans from MySQL", mysql_plans.len());
|
||||||
|
|
||||||
let mysql_courses: Vec<MySqlCourseInfo> = sqlx::query_as(
|
let mysql_courses: Vec<MySqlCourseInfo> = sqlx::query_as(
|
||||||
r#"
|
r#"
|
||||||
SELECT DISTINCT
|
SELECT DISTINCT
|
||||||
@@ -411,13 +420,17 @@ pub async fn import_from_mysql(
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch courses: {}", e)))?;
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch courses: {}", e)))?;
|
||||||
|
|
||||||
|
tracing::info!("Fetched {} courses from MySQL", mysql_courses.len());
|
||||||
|
|
||||||
// Save plans and courses to PostgreSQL
|
// Save plans and courses to PostgreSQL
|
||||||
|
tracing::info!("Saving plans and courses to PostgreSQL...");
|
||||||
save_mysql_courses_and_plans(&pool, org_ctx.id, mysql_plans, mysql_courses)
|
save_mysql_courses_and_plans(&pool, org_ctx.id, mysql_plans, mysql_courses)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to save courses/plans: {}", e)))?;
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to save courses/plans: {}", e)))?;
|
||||||
|
|
||||||
// Fetch questions from MySQL
|
// Fetch questions from MySQL
|
||||||
let mysql_questions: Vec<MySqlQuestion> = if payload.import_all.unwrap_or(false) {
|
let mysql_questions: Vec<MySqlQuestion> = if payload.import_all.unwrap_or(false) {
|
||||||
|
// Import ALL questions (no limit)
|
||||||
sqlx::query_as(
|
sqlx::query_as(
|
||||||
r#"
|
r#"
|
||||||
SELECT bp.idPregunta, bp.descripcion, bp.idTipoPregunta, bp.activo,
|
SELECT bp.idPregunta, bp.descripcion, bp.idTipoPregunta, bp.activo,
|
||||||
@@ -428,13 +441,14 @@ pub async fn import_from_mysql(
|
|||||||
WHERE bp.activo = 1
|
WHERE bp.activo = 1
|
||||||
AND c.Activo = 1
|
AND c.Activo = 1
|
||||||
AND pe.Activo = 1
|
AND pe.Activo = 1
|
||||||
LIMIT 200
|
ORDER BY bp.idPregunta
|
||||||
"#
|
"#
|
||||||
)
|
)
|
||||||
.fetch_all(&mysql_pool)
|
.fetch_all(&mysql_pool)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch questions: {}", e)))?
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch questions: {}", e)))?
|
||||||
} else if let Some(course_id) = payload.mysql_course_id {
|
} else if let Some(course_id) = payload.mysql_course_id {
|
||||||
|
// Import all questions for a specific course (no limit)
|
||||||
sqlx::query_as(
|
sqlx::query_as(
|
||||||
r#"
|
r#"
|
||||||
SELECT bp.idPregunta, bp.descripcion, bp.idTipoPregunta, bp.activo,
|
SELECT bp.idPregunta, bp.descripcion, bp.idTipoPregunta, bp.activo,
|
||||||
@@ -445,7 +459,7 @@ pub async fn import_from_mysql(
|
|||||||
WHERE bp.idCursos = ? AND bp.activo = 1
|
WHERE bp.idCursos = ? AND bp.activo = 1
|
||||||
AND c.Activo = 1
|
AND c.Activo = 1
|
||||||
AND pe.Activo = 1
|
AND pe.Activo = 1
|
||||||
LIMIT 100
|
ORDER BY bp.idPregunta
|
||||||
"#
|
"#
|
||||||
)
|
)
|
||||||
.bind(course_id)
|
.bind(course_id)
|
||||||
@@ -841,11 +855,56 @@ pub async fn import_all_from_mysql(
|
|||||||
// Connect to MySQL
|
// Connect to MySQL
|
||||||
let mysql_url = std::env::var("MYSQL_DATABASE_URL")
|
let mysql_url = std::env::var("MYSQL_DATABASE_URL")
|
||||||
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "MYSQL_DATABASE_URL not configured".to_string()))?;
|
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "MYSQL_DATABASE_URL not configured".to_string()))?;
|
||||||
|
|
||||||
let mysql_pool = sqlx::MySqlPool::connect(&mysql_url)
|
let mysql_pool = sqlx::MySqlPool::connect(&mysql_url)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to connect to MySQL: {}", e)))?;
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to connect to MySQL: {}", e)))?;
|
||||||
|
|
||||||
|
// Fetch all study plans and courses from MySQL to sync them
|
||||||
|
let mysql_plans: Vec<MySqlPlanInfo> = sqlx::query_as(
|
||||||
|
r#"
|
||||||
|
SELECT DISTINCT
|
||||||
|
pe.idPlanDeEstudios AS id_plan_de_estudios,
|
||||||
|
pe.Nombre AS nombre_plan
|
||||||
|
FROM plandeestudios pe
|
||||||
|
WHERE pe.Activo = 1
|
||||||
|
ORDER BY pe.Nombre
|
||||||
|
"#
|
||||||
|
)
|
||||||
|
.fetch_all(&mysql_pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch plans: {}", e)))?;
|
||||||
|
|
||||||
|
tracing::info!("Fetched {} study plans from MySQL", mysql_plans.len());
|
||||||
|
|
||||||
|
let mysql_courses: Vec<MySqlCourseInfo> = sqlx::query_as(
|
||||||
|
r#"
|
||||||
|
SELECT DISTINCT
|
||||||
|
c.idCursos AS id_cursos,
|
||||||
|
c.NombreCurso AS nombre_curso,
|
||||||
|
c.NivelCurso AS nivel_curso,
|
||||||
|
pe.idPlanDeEstudios AS id_plan_de_estudios,
|
||||||
|
pe.Nombre AS nombre_plan,
|
||||||
|
CAST(c.Duracion AS SIGNED INTEGER) AS duracion
|
||||||
|
FROM curso c
|
||||||
|
JOIN plandeestudios pe ON c.idPlanDeEstudios = pe.idPlanDeEstudios
|
||||||
|
WHERE c.Activo = 1
|
||||||
|
AND pe.Activo = 1
|
||||||
|
ORDER BY pe.Nombre, c.NivelCurso
|
||||||
|
"#
|
||||||
|
)
|
||||||
|
.fetch_all(&mysql_pool)
|
||||||
|
.await
|
||||||
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch courses: {}", e)))?;
|
||||||
|
|
||||||
|
tracing::info!("Fetched {} courses from MySQL", mysql_courses.len());
|
||||||
|
|
||||||
|
// Save plans and courses to PostgreSQL
|
||||||
|
tracing::info!("Saving plans and courses to PostgreSQL...");
|
||||||
|
save_mysql_courses_and_plans(&pool, org_ctx.id, mysql_plans, mysql_courses)
|
||||||
|
.await
|
||||||
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to save courses/plans: {}", e)))?;
|
||||||
|
|
||||||
// Fetch ALL questions from MySQL with answers (using JSON aggregation for answers)
|
// Fetch ALL questions from MySQL with answers (using JSON aggregation for answers)
|
||||||
let mysql_questions: Vec<MySqlQuestionFull> = sqlx::query_as(
|
let mysql_questions: Vec<MySqlQuestionFull> = sqlx::query_as(
|
||||||
r#"
|
r#"
|
||||||
@@ -882,7 +941,6 @@ pub async fn import_all_from_mysql(
|
|||||||
AND pe.Activo = 1
|
AND pe.Activo = 1
|
||||||
AND c.Activo = 1
|
AND c.Activo = 1
|
||||||
ORDER BY pe.Nombre, c.NombreCurso, bp.idPregunta
|
ORDER BY pe.Nombre, c.NombreCurso, bp.idPregunta
|
||||||
LIMIT 500
|
|
||||||
"#
|
"#
|
||||||
)
|
)
|
||||||
.fetch_all(&mysql_pool)
|
.fetch_all(&mysql_pool)
|
||||||
|
|||||||
@@ -732,6 +732,7 @@ pub async fn generate_questions_with_rag(
|
|||||||
} else if let Some(course_id) = payload.course_id {
|
} else if let Some(course_id) = payload.course_id {
|
||||||
// Fetch questions from imported MySQL questions in PostgreSQL question_bank
|
// Fetch questions from imported MySQL questions in PostgreSQL question_bank
|
||||||
// Filter by course_id if provided (mysql_course_id from imported metadata)
|
// Filter by course_id if provided (mysql_course_id from imported metadata)
|
||||||
|
// NO LIMIT - fetch all questions for better RAG context
|
||||||
mysql_questions = sqlx::query_as(
|
mysql_questions = sqlx::query_as(
|
||||||
r#"
|
r#"
|
||||||
SELECT
|
SELECT
|
||||||
@@ -753,7 +754,7 @@ pub async fn generate_questions_with_rag(
|
|||||||
WHERE qb.organization_id = $1
|
WHERE qb.organization_id = $1
|
||||||
AND qb.source = 'imported-mysql'
|
AND qb.source = 'imported-mysql'
|
||||||
AND (qb.source_metadata->>'idCursos')::integer = $2
|
AND (qb.source_metadata->>'idCursos')::integer = $2
|
||||||
LIMIT 20
|
ORDER BY qb.created_at DESC
|
||||||
"#
|
"#
|
||||||
)
|
)
|
||||||
.bind(org_ctx.id)
|
.bind(org_ctx.id)
|
||||||
@@ -763,6 +764,7 @@ pub async fn generate_questions_with_rag(
|
|||||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch questions: {}", e)))?;
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch questions: {}", e)))?;
|
||||||
} else {
|
} else {
|
||||||
// Fetch all imported MySQL questions for this organization
|
// Fetch all imported MySQL questions for this organization
|
||||||
|
// NO LIMIT - fetch all questions for better RAG context
|
||||||
mysql_questions = sqlx::query_as(
|
mysql_questions = sqlx::query_as(
|
||||||
r#"
|
r#"
|
||||||
SELECT
|
SELECT
|
||||||
@@ -783,7 +785,7 @@ pub async fn generate_questions_with_rag(
|
|||||||
FROM question_bank qb
|
FROM question_bank qb
|
||||||
WHERE qb.organization_id = $1
|
WHERE qb.organization_id = $1
|
||||||
AND qb.source = 'imported-mysql'
|
AND qb.source = 'imported-mysql'
|
||||||
LIMIT 20
|
ORDER BY qb.created_at DESC
|
||||||
"#
|
"#
|
||||||
)
|
)
|
||||||
.bind(org_ctx.id)
|
.bind(org_ctx.id)
|
||||||
|
|||||||
Reference in New Issue
Block a user