feat: implementing embedding AI

This commit is contained in:
2026-03-18 17:15:39 -03:00
parent e8cdf61468
commit 64d3d5be91
32 changed files with 3568 additions and 174 deletions
@@ -0,0 +1,364 @@
//! Handlers for PGVector embeddings in Question Bank
//! Enables semantic search and RAG with AI-powered embeddings
use axum::{
Json,
extract::{Path, Query, State},
http::StatusCode,
};
use common::ai::{self, generate_embedding};
use common::models::QuestionBank;
use common::middleware::Org;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;
// ==================== Query Parameters ====================
#[derive(Debug, Deserialize)]
pub struct SemanticSearchFilters {
pub query: String,
pub limit: Option<i32>,
pub threshold: Option<f64>,
pub question_type: Option<String>,
pub difficulty: Option<String>,
}
#[derive(Debug, Serialize, Deserialize, sqlx::FromRow)]
pub struct SemanticSearchResult {
pub id: Uuid,
pub question_text: String,
pub question_type: String,
pub similarity: f64, // PostgreSQL vector similarity returns double precision
pub tags: Option<Vec<String>>,
pub difficulty: Option<String>,
pub points: i32,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct GenerateEmbeddingsResult {
pub processed: i32,
pub failed: i32,
pub duration_ms: u64,
}
// ==================== Generate Embeddings ====================
/// POST /api/question-bank/embeddings/generate - Generate embeddings for all questions without them
pub async fn generate_question_embeddings(
Org(org_ctx): Org,
State(pool): State<PgPool>,
) -> Result<Json<GenerateEmbeddingsResult>, (StatusCode, String)> {
let start = std::time::Instant::now();
// Create client that accepts invalid certificates (for dev with self-signed certs)
let client = reqwest::Client::builder()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.build()
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("HTTP client error: {}", e)))?;
let ollama_url = ai::get_ollama_url();
let model = ai::get_embedding_model();
// Get questions without embeddings
let questions: Vec<QuestionBank> = sqlx::query_as(
r#"
SELECT * FROM question_bank
WHERE organization_id = $1
AND (embedding IS NULL OR embedding_updated_at IS NULL)
ORDER BY created_at DESC
LIMIT 100
"#
)
.bind(org_ctx.id)
.fetch_all(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let total = questions.len();
let mut processed = 0;
let mut failed = 0;
for question in questions {
// Generate embedding text (combine question + options + explanation)
let mut embedding_text = question.question_text.clone();
if let Some(options) = &question.options {
if let Some(opts_str) = options.as_str() {
embedding_text.push_str(" ");
embedding_text.push_str(opts_str);
} else if let Some(opts_arr) = options.as_array() {
for opt in opts_arr {
if let Some(opt_str) = opt.as_str() {
embedding_text.push_str(" ");
embedding_text.push_str(opt_str);
}
}
}
}
if let Some(explanation) = &question.explanation {
embedding_text.push_str(" ");
embedding_text.push_str(explanation);
}
// Generate embedding
match generate_embedding(&client, &ollama_url, &model, &embedding_text).await {
Ok(response) => {
let pgvector = ai::embedding_to_pgvector(&response.embedding);
// Update question with embedding
let result: Result<(i64,), sqlx::Error> = sqlx::query_as(
r#"
UPDATE question_bank
SET embedding = $1::vector,
embedding_updated_at = NOW()
WHERE id = $2
RETURNING 1
"#
)
.bind(&pgvector)
.bind(question.id)
.fetch_one(&pool)
.await;
match result {
Ok(_) => {
processed += 1;
tracing::debug!("Generated embedding for question {}", question.id);
}
Err(e) => {
failed += 1;
tracing::error!("Failed to update embedding for question {}: {}", question.id, e);
}
}
}
Err(e) => {
tracing::error!("Failed to generate embedding for question {}: {}", question.id, e);
failed += 1;
}
}
}
let duration_ms = start.elapsed().as_millis() as u64;
tracing::info!(
"Generated embeddings: {} processed, {} failed in {}ms",
processed,
failed,
duration_ms
);
Ok(Json(GenerateEmbeddingsResult {
processed,
failed,
duration_ms,
}))
}
/// POST /api/question-bank/:id/embedding/regenerate - Regenerate embedding for a specific question
pub async fn regenerate_question_embedding(
Org(org_ctx): Org,
Path(question_id): Path<Uuid>,
State(pool): State<PgPool>,
) -> Result<StatusCode, (StatusCode, String)> {
// Create client that accepts invalid certificates
let client = reqwest::Client::builder()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.build()
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("HTTP client error: {}", e)))?;
let ollama_url = ai::get_ollama_url();
let model = ai::get_embedding_model();
// Get question
let question: QuestionBank = sqlx::query_as(
"SELECT * FROM question_bank WHERE id = $1 AND organization_id = $2"
)
.bind(question_id)
.bind(org_ctx.id)
.fetch_optional(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.ok_or((StatusCode::NOT_FOUND, "Question not found".to_string()))?;
// Generate embedding text
let mut embedding_text = question.question_text.clone();
if let Some(options) = &question.options {
if let Some(opts_str) = options.as_str() {
embedding_text.push_str(" ");
embedding_text.push_str(opts_str);
} else if let Some(opts_arr) = options.as_array() {
for opt in opts_arr {
if let Some(opt_str) = opt.as_str() {
embedding_text.push_str(" ");
embedding_text.push_str(opt_str);
}
}
}
}
if let Some(explanation) = &question.explanation {
embedding_text.push_str(" ");
embedding_text.push_str(explanation);
}
// Generate embedding
let response = generate_embedding(&client, &ollama_url, &model, &embedding_text)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("AI error: {}", e)))?;
let pgvector = ai::embedding_to_pgvector(&response.embedding);
// Update question
sqlx::query(
r#"
UPDATE question_bank
SET embedding = $1::vector,
embedding_updated_at = NOW()
WHERE id = $2
"#
)
.bind(&pgvector)
.bind(question_id)
.execute(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(StatusCode::OK)
}
// ==================== Semantic Search ====================
/// GET /api/question-bank/semantic-search - Search questions by semantic similarity
pub async fn semantic_search(
Org(org_ctx): Org,
State(pool): State<PgPool>,
Query(filters): Query<SemanticSearchFilters>,
) -> Result<Json<Vec<SemanticSearchResult>>, (StatusCode, String)> {
// Create client that accepts invalid certificates
let client = reqwest::Client::builder()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.build()
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("HTTP client error: {}", e)))?;
let ollama_url = ai::get_ollama_url();
let model = ai::get_embedding_model();
// Generate embedding for query
let embedding_response = generate_embedding(&client, &ollama_url, &model, &filters.query)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("AI error: {}", e)))?;
let pgvector = ai::embedding_to_pgvector(&embedding_response.embedding);
let limit = filters.limit.unwrap_or(20);
let threshold = filters.threshold.unwrap_or(0.5);
// Build query with optional filters
let mut query = String::from(
r#"
SELECT
id,
question_text,
question_type::text,
1 - (embedding <=> $1::vector) AS similarity,
tags,
difficulty,
points
FROM question_bank
WHERE organization_id = $2
AND embedding IS NOT NULL
AND 1 - (embedding <=> $1::vector) >= $3
"#
);
let mut param_idx = 3;
if let Some(ref question_type) = filters.question_type {
param_idx += 1;
query.push_str(&format!(" AND question_type::text = ${}", param_idx));
}
if let Some(ref difficulty) = filters.difficulty {
param_idx += 1;
query.push_str(&format!(" AND difficulty = ${}", param_idx));
}
param_idx += 1;
query.push_str(&format!(" ORDER BY embedding <=> $1::vector LIMIT ${}", param_idx));
let mut sql_query = sqlx::query_as::<_, SemanticSearchResult>(&query)
.bind(&pgvector)
.bind(org_ctx.id)
.bind(threshold);
if let Some(ref question_type) = filters.question_type {
sql_query = sql_query.bind(question_type);
}
if let Some(ref difficulty) = filters.difficulty {
sql_query = sql_query.bind(difficulty);
}
sql_query = sql_query.bind(limit);
let results = sql_query
.fetch_all(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
Ok(Json(results))
}
/// GET /api/question-bank/similar/:id - Find questions similar to a given question
pub async fn find_similar_questions(
Org(org_ctx): Org,
Path(question_id): Path<Uuid>,
Query(params): Query<SimilarityParams>,
State(pool): State<PgPool>,
) -> Result<Json<Vec<SemanticSearchResult>>, (StatusCode, String)> {
let threshold = params.threshold.unwrap_or(0.85);
let limit = params.limit.unwrap_or(10);
let results = sqlx::query_as::<_, SemanticSearchResult>(
r#"
SELECT
id,
question_text,
question_type::text,
1 - (embedding <=> (SELECT embedding FROM question_bank WHERE id = $1)) AS similarity,
tags,
difficulty,
points
FROM question_bank
WHERE id != $1
AND organization_id = $2
AND embedding IS NOT NULL
ORDER BY embedding <=> (SELECT embedding FROM question_bank WHERE id = $1)
LIMIT $3
"#
)
.bind(question_id)
.bind(org_ctx.id)
.bind(limit)
.fetch_all(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.into_iter()
.filter(|r| r.similarity >= threshold)
.collect();
Ok(Json(results))
}
#[derive(Debug, Deserialize)]
pub struct SimilarityParams {
pub threshold: Option<f64>,
pub limit: Option<i32>,
}
@@ -12,6 +12,142 @@ use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use uuid::Uuid;
// ==================== MySQL Study Plans & Courses ====================
#[derive(Debug, sqlx::FromRow, Serialize, Deserialize)]
pub struct MySqlStudyPlan {
pub id: i32,
pub mysql_id: i32,
pub organization_id: Uuid,
pub name: String,
pub course_type: String,
pub is_active: bool,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, sqlx::FromRow, Serialize, Deserialize)]
pub struct MySqlCourse {
pub id: i32,
pub mysql_id: i32,
pub organization_id: Uuid,
pub study_plan_id: i32,
pub name: String,
pub level: Option<i32>,
pub course_type: String,
pub level_calculated: Option<String>,
pub is_active: bool,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
/// Save or update study plans and courses from MySQL during import
pub async fn save_mysql_courses_and_plans(
pool: &PgPool,
org_id: Uuid,
plans: Vec<MySqlPlanInfo>,
courses: Vec<MySqlCourseInfo>,
) -> Result<(), String> {
// Save study plans first
for plan in plans {
let course_type = calculate_course_type(&plan.nombre_plan);
sqlx::query(
r#"
INSERT INTO mysql_study_plans (mysql_id, organization_id, name, course_type)
VALUES ($1, $2, $3, $4)
ON CONFLICT (mysql_id) DO UPDATE SET
name = EXCLUDED.name,
course_type = EXCLUDED.course_type,
updated_at = NOW()
"#
)
.bind(plan.id_plan_de_estudios)
.bind(org_id)
.bind(&plan.nombre_plan)
.bind(&course_type)
.execute(pool)
.await
.map_err(|e| format!("Failed to save study plan: {}", e))?;
}
// Save courses
for course in courses {
// Determine course_type from duration (40h = regular, 80h = intensive)
let course_type = calculate_course_type_from_duration(course.duracion);
let level_calculated = calculate_course_level(course.nivel_curso);
// Get study_plan_id from mysql_study_plans
let study_plan_id: i32 = sqlx::query_scalar(
"SELECT id FROM mysql_study_plans WHERE mysql_id = $1 AND organization_id = $2"
)
.bind(course.id_plan_de_estudios)
.bind(org_id)
.fetch_one(pool)
.await
.map_err(|e| format!("Failed to find study plan: {}", e))?;
sqlx::query(
r#"
INSERT INTO mysql_courses (
mysql_id, organization_id, study_plan_id, name, level, duracion,
course_type, level_calculated
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (mysql_id) DO UPDATE SET
name = EXCLUDED.name,
level = EXCLUDED.level,
duracion = EXCLUDED.duracion,
course_type = EXCLUDED.course_type,
level_calculated = EXCLUDED.level_calculated,
updated_at = NOW()
"#
)
.bind(course.id_cursos)
.bind(org_id)
.bind(study_plan_id)
.bind(&course.nombre_curso)
.bind(course.nivel_curso)
.bind(course.duracion)
.bind(&course_type)
.bind(&level_calculated)
.execute(pool)
.await
.map_err(|e| format!("Failed to save course: {}", e))?;
}
Ok(())
}
fn calculate_course_type(plan_name: &str) -> String {
let plan_lower = plan_name.to_lowercase();
if plan_lower.contains("intensive") || plan_lower.contains("intensivo") {
"intensive".to_string()
} else {
"regular".to_string()
}
}
fn calculate_course_type_from_duration(duracion: Option<i32>) -> String {
match duracion {
Some(d) if d >= 70 => "intensive".to_string(), // 80h or more = intensive
_ => "regular".to_string(), // 40h or less = regular
}
}
fn calculate_course_level(nivel: Option<i32>) -> String {
match nivel {
None => "intermediate".to_string(),
Some(n) if n <= 2 => "beginner".to_string(),
Some(n) if n <= 4 => "beginner_1".to_string(),
Some(n) if n <= 6 => "beginner_2".to_string(),
Some(n) if n <= 8 => "intermediate".to_string(),
Some(n) if n <= 10 => "intermediate_1".to_string(),
Some(n) if n <= 12 => "intermediate_2".to_string(),
Some(_) => "advanced".to_string(),
}
}
// ==================== Create ====================
/// POST /api/question-bank - Create a new question in the bank
@@ -239,7 +375,47 @@ pub async fn import_from_mysql(
let mysql_pool = sqlx::MySqlPool::connect(&mysql_url)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to connect to MySQL: {}", e)))?;
// Fetch all study plans and courses from MySQL to sync them
let mysql_plans: Vec<MySqlPlanInfo> = sqlx::query_as(
r#"
SELECT DISTINCT
pe.idPlanDeEstudios AS id_plan_de_estudios,
pe.Nombre AS nombre_plan
FROM plandeestudios pe
WHERE pe.Activo = 1
ORDER BY pe.Nombre
"#
)
.fetch_all(&mysql_pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch plans: {}", e)))?;
let mysql_courses: Vec<MySqlCourseInfo> = sqlx::query_as(
r#"
SELECT DISTINCT
c.idCursos AS id_cursos,
c.NombreCurso AS nombre_curso,
c.NivelCurso AS nivel_curso,
pe.idPlanDeEstudios AS id_plan_de_estudios,
pe.Nombre AS nombre_plan,
CAST(c.Duracion AS SIGNED INTEGER) AS duracion
FROM curso c
JOIN plandeestudios pe ON c.idPlanDeEstudios = pe.idPlanDeEstudios
WHERE c.Activo = 1
AND pe.Activo = 1
ORDER BY pe.Nombre, c.NivelCurso
"#
)
.fetch_all(&mysql_pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch courses: {}", e)))?;
// Save plans and courses to PostgreSQL
save_mysql_courses_and_plans(&pool, org_ctx.id, mysql_plans, mysql_courses)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to save courses/plans: {}", e)))?;
// Fetch questions from MySQL
let mysql_questions: Vec<MySqlQuestion> = if payload.import_all.unwrap_or(false) {
sqlx::query_as(
@@ -250,6 +426,8 @@ pub async fn import_from_mysql(
JOIN curso c ON bp.idCursos = c.idCursos
JOIN plandeestudios pe ON bp.idPlanDeEstudios = pe.idPlanDeEstudios
WHERE bp.activo = 1
AND c.Activo = 1
AND pe.Activo = 1
LIMIT 200
"#
)
@@ -265,6 +443,8 @@ pub async fn import_from_mysql(
JOIN curso c ON bp.idCursos = c.idCursos
JOIN plandeestudios pe ON bp.idPlanDeEstudios = pe.idPlanDeEstudios
WHERE bp.idCursos = ? AND bp.activo = 1
AND c.Activo = 1
AND pe.Activo = 1
LIMIT 100
"#
)
@@ -285,6 +465,8 @@ pub async fn import_from_mysql(
JOIN curso c ON bp.idCursos = c.idCursos
JOIN plandeestudios pe ON bp.idPlanDeEstudios = pe.idPlanDeEstudios
WHERE bp.idPregunta = ? AND bp.activo = 1
AND c.Activo = 1
AND pe.Activo = 1
"#
)
.bind(q_id)
@@ -555,16 +737,18 @@ pub async fn list_mysql_courses(
// Fetch courses with their plan names
let courses: Vec<MySqlCourseInfo> = sqlx::query_as(
r#"
SELECT DISTINCT
c.idCursos,
c.NombreCurso,
c.NivelCurso,
pe.idPlanDeEstudios,
pe.Nombre as NombrePlan
SELECT DISTINCT
c.idCursos AS id_cursos,
c.NombreCurso AS nombre_curso,
c.NivelCurso AS nivel_curso,
pe.idPlanDeEstudios AS id_plan_de_estudios,
pe.Nombre AS nombre_plan,
CAST(c.Duracion AS SIGNED INTEGER) AS duracion
FROM curso c
JOIN plandeestudios pe ON c.idPlanDeEstudios = pe.idPlanDeEstudios
WHERE c.Activo = 1
ORDER BY pe.Nombre, c.NombreCurso
AND pe.Activo = 1
ORDER BY pe.Nombre, c.NivelCurso
"#
)
.fetch_all(&mysql_pool)
@@ -576,6 +760,78 @@ pub async fn list_mysql_courses(
Ok(Json(courses))
}
/// GET /api/question-bank/mysql-plans - Get all study plans from PostgreSQL (imported from MySQL)
pub async fn get_mysql_plans(
Org(org_ctx): Org,
State(pool): State<PgPool>,
) -> Result<Json<Vec<MySqlPlanInfo>>, (StatusCode, String)> {
// Fetch all study plans from PostgreSQL
let plans: Vec<MySqlPlanInfo> = sqlx::query_as(
r#"
SELECT
mysql_id as "idPlanDeEstudios",
name as "NombrePlan"
FROM mysql_study_plans
WHERE organization_id = $1 AND is_active = true
ORDER BY name
"#
)
.bind(org_ctx.id)
.fetch_all(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch plans: {}", e)))?;
Ok(Json(plans))
}
/// GET /api/question-bank/mysql-courses - Get courses filtered by plan from PostgreSQL
pub async fn get_mysql_courses_by_plan(
Org(org_ctx): Org,
State(pool): State<PgPool>,
Query(filters): Query<MySqlCoursesFilters>,
) -> Result<Json<Vec<MySqlCourseInfo>>, (StatusCode, String)> {
// Fetch courses filtered by plan from PostgreSQL
let courses: Vec<MySqlCourseInfo> = sqlx::query_as(
r#"
SELECT
c.mysql_id as "idCursos",
c.name as "NombreCurso",
c.level as "NivelCurso",
sp.mysql_id as "idPlanDeEstudios",
sp.name as "NombrePlan",
c.duracion as "Duracion"
FROM mysql_courses c
JOIN mysql_study_plans sp ON c.study_plan_id = sp.id
WHERE c.organization_id = $1
AND c.is_active = true
AND sp.mysql_id = $2
ORDER BY c.level
"#
)
.bind(org_ctx.id)
.bind(filters.plan_id)
.fetch_all(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch courses: {}", e)))?;
Ok(Json(courses))
}
#[derive(Debug, Deserialize)]
pub struct MySqlCoursesFilters {
pub plan_id: i32,
}
#[derive(Debug, sqlx::FromRow, Serialize)]
pub struct MySqlPlanInfo {
#[sqlx(rename = "idPlanDeEstudios")]
#[serde(rename = "idPlanDeEstudios")]
pub id_plan_de_estudios: i32,
#[sqlx(rename = "NombrePlan")]
#[serde(rename = "NombrePlan")]
pub nombre_plan: String,
}
/// POST /api/question-bank/import-mysql-all - Import ALL questions from MySQL (bulk import)
pub async fn import_all_from_mysql(
Org(org_ctx): Org,
@@ -623,6 +879,8 @@ pub async fn import_all_from_mysql(
JOIN plandeestudios pe ON bp.idPlanDeEstudios = pe.idPlanDeEstudios
JOIN tipopregunta tp ON bp.idTipoPregunta = tp.idTipoPregunta
WHERE bp.activo = 1
AND pe.Activo = 1
AND c.Activo = 1
ORDER BY pe.Nombre, c.NombreCurso, bp.idPregunta
LIMIT 500
"#
@@ -754,11 +1012,24 @@ pub struct ImportResult {
#[derive(Debug, sqlx::FromRow, Serialize, Deserialize)]
pub struct MySqlCourseInfo {
#[sqlx(rename = "idCursos")]
#[serde(rename = "idCursos")]
pub id_cursos: i32,
#[sqlx(rename = "NombreCurso")]
#[serde(rename = "NombreCurso")]
pub nombre_curso: String,
#[sqlx(rename = "NivelCurso")]
#[serde(rename = "NivelCurso", skip_serializing_if = "Option::is_none")]
pub nivel_curso: Option<i32>,
#[sqlx(rename = "idPlanDeEstudios")]
#[serde(rename = "idPlanDeEstudios")]
pub id_plan_de_estudios: i32,
#[sqlx(rename = "NombrePlan")]
#[serde(rename = "NombrePlan")]
pub nombre_plan: String,
#[sqlx(rename = "Duracion")]
#[serde(rename = "Duracion", skip_serializing_if = "Option::is_none")]
pub duracion: Option<i32>, // Duration in hours (40=regular, 80=intensive)
}
// Excel import - pendiente de fix
@@ -17,6 +17,7 @@ use uuid::Uuid;
#[derive(Debug, Deserialize)]
pub struct TestTemplateFilters {
pub mysql_course_id: Option<i32>, // Filter by MySQL course ID
pub level: Option<CourseLevel>,
pub course_type: Option<CourseType>,
pub test_type: Option<TestType>,
@@ -36,12 +37,12 @@ pub async fn create_test_template(
let template: TestTemplate = sqlx::query_as(
r#"
INSERT INTO test_templates (
organization_id, created_by, name, description, level, course_type,
test_type, duration_minutes, passing_score, total_points,
organization_id, created_by, name, description, mysql_course_id,
level, course_type, test_type, duration_minutes, passing_score, total_points,
instructions, template_data, tags
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
RETURNING id, organization_id, created_by, name, description, level, course_type,
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
RETURNING id, organization_id, mysql_course_id, name, description, level, course_type,
test_type, duration_minutes, passing_score, total_points, instructions,
template_data, tags, is_active, usage_count, created_at, updated_at
"#
@@ -50,8 +51,9 @@ pub async fn create_test_template(
.bind(claims.sub)
.bind(&payload.name)
.bind(&payload.description)
.bind(&payload.level)
.bind(&payload.course_type)
.bind(payload.mysql_course_id)
.bind(payload.level.as_ref())
.bind(payload.course_type.as_ref())
.bind(&payload.test_type)
.bind(payload.duration_minutes)
.bind(payload.passing_score)
@@ -78,6 +80,12 @@ pub async fn list_test_templates(
let mut query = String::from("SELECT * FROM test_templates WHERE organization_id = $1");
let mut param_count = 1;
// Filter by mysql_course_id
if filters.mysql_course_id.is_some() {
param_count += 1;
query.push_str(&format!(" AND mysql_course_id = ${}", param_count));
}
// Filter by level
if filters.level.is_some() {
param_count += 1;
@@ -116,6 +124,10 @@ pub async fn list_test_templates(
// Build query with dynamic binds
let mut sql_query = sqlx::query_as::<_, TestTemplate>(&query).bind(org_ctx.id);
if let Some(mysql_course_id) = &filters.mysql_course_id {
sql_query = sql_query.bind(mysql_course_id);
}
if let Some(level) = &filters.level {
sql_query = sql_query.bind(level);
}
@@ -220,22 +232,23 @@ pub async fn update_test_template(
let template: TestTemplate = sqlx::query_as(
r#"
UPDATE test_templates
SET
SET
name = COALESCE($3, name),
description = COALESCE($4, description),
level = COALESCE($5, level),
course_type = COALESCE($6, course_type),
test_type = COALESCE($7, test_type),
duration_minutes = COALESCE($8, duration_minutes),
passing_score = COALESCE($9, passing_score),
total_points = COALESCE($10, total_points),
instructions = COALESCE($11, instructions),
template_data = COALESCE($12, template_data),
tags = COALESCE($13, tags),
is_active = COALESCE($14, is_active),
mysql_course_id = COALESCE($5, mysql_course_id),
level = COALESCE($6, level),
course_type = COALESCE($7, course_type),
test_type = COALESCE($8, test_type),
duration_minutes = COALESCE($9, duration_minutes),
passing_score = COALESCE($10, passing_score),
total_points = COALESCE($11, total_points),
instructions = COALESCE($12, instructions),
template_data = COALESCE($13, template_data),
tags = COALESCE($14, tags),
is_active = COALESCE($15, is_active),
updated_at = NOW()
WHERE id = $1 AND organization_id = $2
RETURNING id, organization_id, created_by, name, description, level, course_type,
RETURNING id, organization_id, mysql_course_id, name, description, level, course_type,
test_type, duration_minutes, passing_score, total_points, instructions,
template_data, tags, is_active, usage_count, created_at, updated_at
"#
@@ -244,6 +257,7 @@ pub async fn update_test_template(
.bind(org_ctx.id)
.bind(payload.name)
.bind(payload.description)
.bind(payload.mysql_course_id)
.bind(payload.level)
.bind(payload.course_type)
.bind(payload.test_type)
@@ -615,70 +629,186 @@ pub struct ApplyTemplatePayload {
// ==================== RAG Question Generation ====================
/// POST /test-templates/generate-with-rag - Generate questions using RAG from MySQL question bank
/// POST /test-templates/generate-with-rag - Generate questions using RAG from imported MySQL question bank
/// Uses semantic search with pgvector embeddings when available, falls back to course_id filtering
pub async fn generate_questions_with_rag(
Org(org_ctx): Org,
claims: Claims,
State(pool): State<PgPool>,
Json(payload): Json<RagGenerationPayload>,
) -> Result<Json<Vec<TestTemplateQuestion>>, (StatusCode, String)> {
use common::ai::{self, generate_embedding};
use reqwest::Client;
use serde_json::json;
// 1. Fetch questions from external MySQL database (RAG context)
let mysql_url = std::env::var("MYSQL_DATABASE_URL")
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "MYSQL_DATABASE_URL not configured".to_string()))?;
// Create MySQL pool connection
let mysql_pool = sqlx::MySqlPool::connect(&mysql_url)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to connect to MySQL: {}", e)))?;
// Fetch questions from MySQL bank filtered by course if provided
let mysql_questions: Vec<MySqlQuestion> = if let Some(course_id) = payload.course_id {
sqlx::query_as(
r#"
SELECT
bp.descripcion,
bp.idTipoPregunta AS id_tipo_pregunta,
c.NombreCurso AS nombre_curso,
pe.Nombre as plan_nombre
FROM bancopreguntas bp
JOIN curso c ON bp.idCursos = c.idCursos
JOIN plandeestudios pe ON bp.idPlanDeEstudios = pe.idPlanDeEstudios
WHERE bp.idCursos = ? AND bp.activo = 1
LIMIT 20
"#
)
.bind(course_id)
.fetch_all(&mysql_pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch questions: {}", e)))?
} else {
sqlx::query_as(
r#"
SELECT
bp.descripcion,
bp.idTipoPregunta AS id_tipo_pregunta,
c.NombreCurso AS nombre_curso,
pe.Nombre as plan_nombre
FROM bancopreguntas bp
JOIN curso c ON bp.idCursos = c.idCursos
JOIN plandeestudios pe ON bp.idPlanDeEstudios = pe.idPlanDeEstudios
WHERE bp.activo = 1
LIMIT 20
"#
)
.fetch_all(&mysql_pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch questions: {}", e)))?
};
mysql_pool.close().await;
let mut mysql_questions: Vec<QuestionBankForRAG> = Vec::new();
// If topic is provided, use semantic search; otherwise use course_id filtering
if let Some(topic) = &payload.topic {
// Try semantic search with embeddings
// Create client that accepts invalid certificates (for dev with self-signed certs)
let client = reqwest::Client::builder()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.build()
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("HTTP client error: {}", e)))?;
let ollama_url = ai::get_ollama_url();
let model = ai::get_embedding_model();
match generate_embedding(&client, &ollama_url, &model, topic).await {
Ok(response) => {
let pgvector = ai::embedding_to_pgvector(&response.embedding);
// Semantic search in question_bank
mysql_questions = sqlx::query_as(
r#"
SELECT
qb.question_text as descripcion,
qb.options,
COALESCE(
(qb.source_metadata->>'idPlanDeEstudios')::integer,
0
) as id_plan_de_estudios,
COALESCE(
qb.source_metadata->>'plan_nombre',
''
) as plan_nombre,
COALESCE(
(qb.source_metadata->>'nivel_curso')::integer,
NULL
) as nivel_curso,
1 - (qb.embedding <=> $1::vector) AS similarity
FROM question_bank qb
WHERE qb.organization_id = $2
AND qb.embedding IS NOT NULL
ORDER BY qb.embedding <=> $1::vector
LIMIT $3
"#
)
.bind(&pgvector)
.bind(org_ctx.id)
.bind(payload.num_questions.unwrap_or(5) * 3) // Get more for diversity
.fetch_all(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Semantic search failed: {}", e)))?;
tracing::info!("Semantic search found {} similar questions", mysql_questions.len());
}
Err(e) => {
tracing::warn!("Semantic search failed, falling back to keyword search: {}", e);
// Fall back to text search
mysql_questions = sqlx::query_as(
r#"
SELECT
qb.question_text as descripcion,
qb.options,
COALESCE(
(qb.source_metadata->>'idPlanDeEstudios')::integer,
0
) as id_plan_de_estudios,
COALESCE(
qb.source_metadata->>'plan_nombre',
''
) as plan_nombre,
COALESCE(
(qb.source_metadata->>'nivel_curso')::integer,
NULL
) as nivel_curso
FROM question_bank qb
WHERE qb.organization_id = $1
AND qb.question_text ILIKE $2
LIMIT $3
"#
)
.bind(org_ctx.id)
.bind(&format!("%{}%", topic))
.bind(payload.num_questions.unwrap_or(5) * 3)
.fetch_all(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Keyword search failed: {}", e)))?;
}
}
} else if let Some(course_id) = payload.course_id {
// Fetch questions from imported MySQL questions in PostgreSQL question_bank
// Filter by course_id if provided (mysql_course_id from imported metadata)
mysql_questions = sqlx::query_as(
r#"
SELECT
qb.question_text as descripcion,
qb.options,
COALESCE(
(qb.source_metadata->>'idPlanDeEstudios')::integer,
0
) as id_plan_de_estudios,
COALESCE(
qb.source_metadata->>'plan_nombre',
''
) as plan_nombre,
COALESCE(
(qb.source_metadata->>'nivel_curso')::integer,
NULL
) as nivel_curso
FROM question_bank qb
WHERE qb.organization_id = $1
AND qb.source = 'imported-mysql'
AND (qb.source_metadata->>'idCursos')::integer = $2
LIMIT 20
"#
)
.bind(org_ctx.id)
.bind(course_id)
.fetch_all(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch questions: {}", e)))?;
} else {
// Fetch all imported MySQL questions for this organization
mysql_questions = sqlx::query_as(
r#"
SELECT
qb.question_text as descripcion,
qb.options,
COALESCE(
(qb.source_metadata->>'idPlanDeEstudios')::integer,
0
) as id_plan_de_estudios,
COALESCE(
qb.source_metadata->>'plan_nombre',
''
) as plan_nombre,
COALESCE(
(qb.source_metadata->>'nivel_curso')::integer,
NULL
) as nivel_curso
FROM question_bank qb
WHERE qb.organization_id = $1
AND qb.source = 'imported-mysql'
LIMIT 20
"#
)
.bind(org_ctx.id)
.fetch_all(&pool)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to fetch questions: {}", e)))?;
}
if mysql_questions.is_empty() && payload.course_id.is_some() {
return Err((StatusCode::NOT_FOUND, "No questions found in MySQL bank for this course".to_string()));
return Err((StatusCode::NOT_FOUND, "No questions found in imported question bank for this course. Please import questions from MySQL first.".to_string()));
}
// Determine course_type and level from imported data
let course_type = mysql_questions
.first()
.map(|q| get_course_type_from_plan(&q.plan_nombre))
.unwrap_or(CourseType::Regular);
let level = mysql_questions
.first()
.map(|q| get_course_level_from_mysql(q.nivel_curso, &q.plan_nombre, ""))
.unwrap_or(CourseLevel::Intermediate);
tracing::info!("Determined course_type: {:?}, level: {:?} from imported data", course_type, level);
// 2. Build RAG context from MySQL questions (lightweight format)
let rag_context: String = mysql_questions
.iter()
@@ -715,19 +845,25 @@ pub async fn generate_questions_with_rag(
Create {} ORIGINAL multiple-choice questions about: {}
Return ONLY a JSON array with this structure:
IMPORTANT - Return ONLY a JSON array with this EXACT structure:
[
{{
"question_text": "Question text",
"question_text": "The tourist got lost in the ______ of the city.",
"question_type": "multiple-choice",
"options": ["A", "B", "C", "D"],
"options": ["downtown", "countryside", "mountains", "desert"],
"correct_answer": 0,
"explanation": "Why this is correct",
"explanation": "Downtown is the main area of a city where tourists typically visit.",
"points": 1,
"skill_assessed": "reading"
}}
]
RULES FOR OPTIONS:
- Each option must be ONLY the answer text (1-3 words max)
- Do NOT include letters like "A.", "B.", "a)", "b)"
- Do NOT include "Option 1:", "Answer:", or any prefix
- Just the pure answer text (e.g., "downtown", "Paris", "True")
Skills: reading, listening, speaking, writing. Distribute across all 4."#,
rag_context,
num_questions,
@@ -777,21 +913,118 @@ pub async fn generate_questions_with_rag(
.and_then(|content| content.as_str())
.and_then(|content| serde_json::from_str::<serde_json::Value>(content).ok())
.and_then(|data| {
if let Some(questions) = data.get("questions").or(data.get("items")) {
questions.as_array().cloned()
} else if let Some(arr) = data.as_array() {
Some(arr.clone())
} else {
None
// Try multiple formats:
// 1. Standard array format: [...]
if let Some(arr) = data.as_array() {
return Some(arr.clone());
}
// 2. Wrapped format: {questions: [...]} or {items: [...]}
if let Some(questions) = data.get("questions").or(data.get("items")) {
return questions.as_array().cloned();
}
// 3. Object format with numbered keys: {q1: {...}, q2: {...}, ...}
if let Some(obj) = data.as_object() {
let questions: Vec<serde_json::Value> = obj.values().cloned().collect();
if !questions.is_empty() {
return Some(questions);
}
}
None
})
.unwrap_or_default();
// Helper function to clean options (remove "A.", "B.", "a)", etc.)
let clean_option = |opt: &str| -> String {
let opt = opt.trim();
// Remove patterns like "A.", "B.", "a)", "b)", "1.", "1)", "A)", "B)"
let patterns = [
(r"^[A-Za-z]\.\s*", ""), // "A. ", "B. "
(r"^[A-Za-z]\)\s*", ""), // "A) ", "B) "
(r"^\d+\.\s*", ""), // "1. ", "2. "
(r"^\d+\)\s*", ""), // "1) ", "2) "
(r"^Option\s+[A-Za-z]\.?\s*", ""), // "Option A. ", "Option B "
(r"^Answer\s*[:\.]?\s*", ""), // "Answer: ", "Answer. "
];
let mut cleaned = opt.to_string();
for (pattern, replacement) in patterns.iter() {
if let Ok(re) = regex::Regex::new(pattern) {
cleaned = re.replace(&cleaned, *replacement).to_string();
}
}
cleaned.trim().to_string()
};
// Helper function to shuffle options and adjust correct_answer index
let shuffle_options = |options: Vec<String>, correct_answer: Option<i64>| -> (Vec<String>, Option<i64>) {
use rand::seq::SliceRandom;
use rand::thread_rng;
if options.is_empty() || correct_answer.is_none() {
return (options, correct_answer);
}
let correct_idx = correct_answer.unwrap() as usize;
if correct_idx >= options.len() {
return (options, correct_answer);
}
// Store the correct answer text
let correct_answer_text = options[correct_idx].clone();
// Create a vector of indices and shuffle it
let mut indices: Vec<usize> = (0..options.len()).collect();
let mut rng = thread_rng();
indices.shuffle(&mut rng);
// Reorder options according to shuffled indices
let shuffled_options: Vec<String> = indices.iter().map(|&i| options[i].clone()).collect();
// Find the new position of the correct answer
let new_correct_idx = shuffled_options
.iter()
.position(|opt| opt == &correct_answer_text)
.map(|idx| idx as i64);
(shuffled_options, new_correct_idx)
};
// Convert to TestTemplateQuestion format
let generated_questions: Vec<TestTemplateQuestion> = questions_data
.iter()
.enumerate()
.map(|(idx, q)| {
// Get original options and correct answer
let original_options: Vec<String> = q
.get("options")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str())
.map(|s| clean_option(s))
.collect()
})
.unwrap_or_default();
let original_correct_idx: Option<usize> = q
.get("correct_answer")
.or(q.get("correct"))
.and_then(|v| v.as_i64())
.map(|idx| idx as usize);
// Shuffle options if we have valid data
let (options, correct_answer) = if !original_options.is_empty() && original_correct_idx.is_some() {
let correct_idx = original_correct_idx.unwrap();
if correct_idx < original_options.len() {
let (shuffled, new_correct_idx) = shuffle_options(original_options.clone(), Some(correct_idx as i64));
(Some(json!(shuffled)), new_correct_idx.map(|idx| json!(idx)))
} else {
(Some(json!(original_options)), q.get("correct_answer").or(q.get("correct")).cloned())
}
} else {
(Some(json!(original_options)), q.get("correct_answer").or(q.get("correct")).cloned())
};
TestTemplateQuestion {
id: Uuid::new_v4(),
template_id: Uuid::nil(),
@@ -799,14 +1032,15 @@ pub async fn generate_questions_with_rag(
question_order: idx as i32,
question_type: q.get("question_type").and_then(|v| v.as_str()).unwrap_or("multiple-choice").to_string(),
question_text: q.get("question_text").and_then(|v| v.as_str()).unwrap_or("Question").to_string(),
options: q.get("options").cloned(),
correct_answer: q.get("correct_answer").or(q.get("correct")).cloned(),
options,
correct_answer,
explanation: q.get("explanation").and_then(|v| v.as_str()).map(String::from),
points: q.get("points").and_then(|v| v.as_i64()).unwrap_or(1) as i32,
metadata: Some(json!({
"generated_by": "rag-ai",
"source": "mysql-bank",
"generated_at": chrono::Utc::now().to_rfc3339(),
"options_shuffled": true,
})),
created_at: chrono::Utc::now(),
}
@@ -874,15 +1108,64 @@ pub async fn generate_questions_with_rag(
#[derive(Debug, Deserialize)]
pub struct RagGenerationPayload {
pub course_id: Option<i32>, // MySQL course ID
pub course_id: Option<i32>, // MySQL course ID from imported metadata
pub topic: Option<String>,
pub num_questions: Option<i32>,
}
#[derive(Debug, sqlx::FromRow)]
struct QuestionBankForRAG {
descripcion: String,
options: Option<serde_json::Value>,
id_plan_de_estudios: i32,
plan_nombre: String,
nivel_curso: Option<i32>,
#[sqlx(default)]
similarity: Option<f32>,
}
#[derive(Debug, sqlx::FromRow)]
struct MySqlQuestion {
descripcion: String,
id_tipo_pregunta: i32,
nombre_curso: String,
plan_nombre: String,
nivel_curso: Option<i32>,
id_plan_de_estudios: i32,
}
/// Helper function to determine course type from plan name
fn get_course_type_from_plan(plan_name: &str) -> CourseType {
let plan_lower = plan_name.to_lowercase();
if plan_lower.contains("intensive") || plan_lower.contains("intensivo") {
CourseType::Intensive
} else {
CourseType::Regular
}
}
/// Helper function to determine course level from MySQL data
fn get_course_level_from_mysql(nivel_curso: Option<i32>, plan_nombre: &str, _nombre_curso: &str) -> CourseLevel {
// Try to determine level from nivel_curso field first
if let Some(nivel) = nivel_curso {
return match nivel {
1..=2 => CourseLevel::Beginner,
3..=4 => CourseLevel::Beginner_1,
5..=6 => CourseLevel::Beginner_2,
7..=8 => CourseLevel::Intermediate,
9..=10 => CourseLevel::Intermediate_1,
11..=12 => CourseLevel::Intermediate_2,
_ => CourseLevel::Advanced,
};
}
// Fallback: try to extract level from plan name
let plan_lower = plan_nombre.to_lowercase();
if plan_lower.contains("basic") || plan_lower.contains("beginner") {
CourseLevel::Beginner
} else if plan_lower.contains("intermediate") || plan_lower.contains("intermedio") {
CourseLevel::Intermediate
} else {
CourseLevel::Advanced
}
}
+23 -6
View File
@@ -10,6 +10,7 @@ mod handlers_rubrics;
mod handlers_test_templates;
mod handlers_question_bank;
mod handlers_admin;
mod handlers_embeddings;
mod webhooks;
use axum::{
@@ -343,9 +344,13 @@ async fn main() {
"/question-bank/import-mysql",
post(handlers_question_bank::import_from_mysql),
)
.route(
"/question-bank/mysql-plans",
get(handlers_question_bank::get_mysql_plans),
)
.route(
"/question-bank/mysql-courses",
get(handlers_question_bank::list_mysql_courses),
get(handlers_question_bank::get_mysql_courses_by_plan),
)
.route(
"/question-bank/import-mysql-all",
@@ -355,11 +360,23 @@ async fn main() {
"/question-bank/ai-generate",
post(handlers_question_bank::ai_generate_question),
)
// Excel import - pendiente de fix
// .route(
// "/question-bank/import-excel",
// post(handlers_question_bank::import_from_excel),
// )
// Embedding routes for semantic search
.route(
"/question-bank/embeddings/generate",
post(handlers_embeddings::generate_question_embeddings),
)
.route(
"/question-bank/semantic-search",
get(handlers_embeddings::semantic_search),
)
.route(
"/question-bank/similar/{id}",
get(handlers_embeddings::find_similar_questions),
)
.route(
"/question-bank/{id}/embedding/regenerate",
post(handlers_embeddings::regenerate_question_embedding),
)
// Admin routes
.route(
"/admin/token-usage",