feat: Implement SAM structure mirroring in PostgreSQL for study plans and courses

- Added functionality to save study plans and courses in SAM format to PostgreSQL.
- Updated SQL queries to reflect SAM-native column names and handle conflicts appropriately.
- Introduced new fields in the Asset model for English level and SAM identifiers.
- Enhanced the TestTemplateForm component to manage linked assets and shared materials.
- Created a new AdminSharedMaterialsPage for uploading ZIP files of shared materials.
- Added migrations to create SAM mirror tables and update the assets table with new columns.
This commit is contained in:
2026-04-06 17:04:36 -04:00
parent eea456cd95
commit 7f9b9d69ae
12 changed files with 795 additions and 59 deletions
+132 -15
View File
@@ -19,6 +19,7 @@ use uuid::Uuid;
use std::env;
use std::path::Path as StdPath;
use tokio::process::Command;
use tokio::io::AsyncWriteExt;
#[derive(Debug, Serialize)]
pub struct AssetUploadResponse {
@@ -49,6 +50,9 @@ pub struct AssetZipImportResponse {
pub struct AssetFilters {
pub mimetype: Option<String>,
pub course_id: Option<Uuid>,
pub english_level: Option<String>,
pub sam_plan_id: Option<i32>,
pub sam_course_id: Option<i32>,
pub search: Option<String>,
pub page: Option<u32>,
pub limit: Option<u32>,
@@ -239,6 +243,9 @@ pub async fn upload_asset(
let mut data = Vec::new();
let mut mimetype = String::new();
let mut course_id: Option<Uuid> = None;
let mut english_level: Option<String> = None;
let mut sam_plan_id: Option<i32> = None;
let mut sam_course_id: Option<i32> = None;
while let Some(field) = multipart
.next_field()
@@ -263,6 +270,25 @@ pub async fn upload_asset(
course_id = Some(id);
}
}
} else if name == "english_level" {
if let Ok(txt) = field.text().await {
let value = txt.trim();
if !value.is_empty() {
english_level = Some(value.to_string());
}
}
} else if name == "sam_plan_id" {
if let Ok(txt) = field.text().await {
if let Ok(id) = txt.trim().parse::<i32>() {
sam_plan_id = Some(id);
}
}
} else if name == "sam_course_id" {
if let Ok(txt) = field.text().await {
if let Ok(id) = txt.trim().parse::<i32>() {
sam_course_id = Some(id);
}
}
}
}
@@ -339,14 +365,17 @@ pub async fn upload_asset(
// Record in DB
sqlx::query(
r#"
INSERT INTO assets (id, organization_id, uploaded_by, course_id, filename, storage_path, mimetype, size_bytes)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
INSERT INTO assets (id, organization_id, uploaded_by, course_id, english_level, sam_plan_id, sam_course_id, filename, storage_path, mimetype, size_bytes)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
"#,
)
.bind(asset_id)
.bind(org_ctx.id)
.bind(claims.sub)
.bind(course_id)
.bind(&english_level)
.bind(sam_plan_id)
.bind(sam_course_id)
.bind(&stored_filename)
.bind(&db_storage_path)
.bind(&stored_mimetype)
@@ -386,6 +415,21 @@ pub async fn list_assets(
param_index += 1;
}
if filters.english_level.is_some() {
query.push_str(&format!(" AND english_level = ${}", param_index));
param_index += 1;
}
if filters.sam_plan_id.is_some() {
query.push_str(&format!(" AND sam_plan_id = ${}", param_index));
param_index += 1;
}
if filters.sam_course_id.is_some() {
query.push_str(&format!(" AND sam_course_id = ${}", param_index));
param_index += 1;
}
if filters.search.is_some() {
query.push_str(&format!(" AND filename ILIKE ${}", param_index));
param_index += 1;
@@ -403,6 +447,18 @@ pub async fn list_assets(
sql_query = sql_query.bind(cid);
}
if let Some(level) = &filters.english_level {
sql_query = sql_query.bind(level);
}
if let Some(plan_id) = filters.sam_plan_id {
sql_query = sql_query.bind(plan_id);
}
if let Some(course_id) = filters.sam_course_id {
sql_query = sql_query.bind(course_id);
}
if let Some(search) = &filters.search {
sql_query = sql_query.bind(format!("%{}%", search));
}
@@ -551,11 +607,14 @@ pub async fn import_assets_zip(
State(pool): State<PgPool>,
mut multipart: Multipart,
) -> Result<Json<AssetZipImportResponse>, (StatusCode, String)> {
let mut zip_data = Vec::new();
let mut zip_temp_path: Option<String> = None;
let mut course_id: Option<Uuid> = None;
let mut english_level: Option<String> = None;
let mut sam_plan_id: Option<i32> = None;
let mut sam_course_id: Option<i32> = None;
let mut ingest_rag = false;
while let Some(field) = multipart
while let Some(mut field) = multipart
.next_field()
.await
.map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?
@@ -563,11 +622,32 @@ pub async fn import_assets_zip(
let name = field.name().unwrap_or_default().to_string();
if name == "file" {
zip_data = field
.bytes()
let temp_name = format!("uploads/tmp/import-{}.zip", Uuid::new_v4());
tokio::fs::create_dir_all("uploads/tmp")
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
.to_vec();
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to create temp dir: {}", e)))?;
let mut temp_file = tokio::fs::File::create(&temp_name)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to create temp zip file: {}", e)))?;
while let Some(chunk) = field
.chunk()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to read upload chunk: {}", e)))?
{
temp_file
.write_all(&chunk)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to write temp zip file: {}", e)))?;
}
temp_file
.flush()
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to flush temp zip file: {}", e)))?;
zip_temp_path = Some(temp_name);
} else if name == "course_id" {
if let Ok(txt) = field.text().await {
if let Ok(id) = Uuid::parse_str(txt.trim()) {
@@ -579,17 +659,46 @@ pub async fn import_assets_zip(
let v = txt.trim().to_lowercase();
ingest_rag = v == "1" || v == "true" || v == "yes";
}
} else if name == "english_level" {
if let Ok(txt) = field.text().await {
let value = txt.trim();
if !value.is_empty() {
english_level = Some(value.to_string());
}
}
} else if name == "sam_plan_id" {
if let Ok(txt) = field.text().await {
if let Ok(id) = txt.trim().parse::<i32>() {
sam_plan_id = Some(id);
}
}
} else if name == "sam_course_id" {
if let Ok(txt) = field.text().await {
if let Ok(id) = txt.trim().parse::<i32>() {
sam_course_id = Some(id);
}
}
}
}
if zip_data.is_empty() {
let zip_path = match zip_temp_path {
Some(path) => path,
None => {
return Err((StatusCode::BAD_REQUEST, "No ZIP file uploaded".to_string()));
}
};
let zip_file = std::fs::File::open(&zip_path)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to open temp zip file: {}", e)))?;
let mut archive = zip::ZipArchive::new(zip_file)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid ZIP file".to_string()))?;
if archive.is_empty() {
let _ = tokio::fs::remove_file(&zip_path).await;
return Err((StatusCode::BAD_REQUEST, "No ZIP file uploaded".to_string()));
}
let reader = std::io::Cursor::new(zip_data);
let mut archive = zip::ZipArchive::new(reader)
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid ZIP file".to_string()))?;
let mut imported_assets = 0usize;
let mut rag_ingested_assets = 0usize;
let mut rag_chunks_ingested = 0usize;
@@ -725,14 +834,17 @@ pub async fn import_assets_zip(
let insert_result = sqlx::query(
r#"
INSERT INTO assets (id, organization_id, uploaded_by, course_id, filename, storage_path, mimetype, size_bytes)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
INSERT INTO assets (id, organization_id, uploaded_by, course_id, english_level, sam_plan_id, sam_course_id, filename, storage_path, mimetype, size_bytes)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
"#,
)
.bind(asset_id)
.bind(org_ctx.id)
.bind(claims.sub)
.bind(course_id)
.bind(&english_level)
.bind(sam_plan_id)
.bind(sam_course_id)
.bind(&stored_filename)
.bind(&db_storage_path)
.bind(&mimetype)
@@ -753,6 +865,9 @@ pub async fn import_assets_zip(
organization_id: org_ctx.id,
uploaded_by: Some(claims.sub),
course_id,
english_level: english_level.clone(),
sam_plan_id,
sam_course_id,
filename: stored_filename.clone(),
storage_path: db_storage_path.clone(),
mimetype: mimetype.clone(),
@@ -820,6 +935,8 @@ pub async fn import_assets_zip(
}
}
let _ = tokio::fs::remove_file(&zip_path).await;
Ok(Json(AssetZipImportResponse {
imported_assets,
rag_ingested_assets,
@@ -103,6 +103,24 @@ pub async fn save_mysql_courses_and_plans(
let course_type = calculate_course_type(&plan.nombre_plan);
tracing::debug!("Saving study plan: {} (ID: {})", plan.nombre_plan, plan.id_plan_de_estudios);
// Mirror SAM structure in PostgreSQL using SAM-native column names.
sqlx::query(
r#"
INSERT INTO sam_study_plans (organization_id, idPlanDeEstudios, Nombre, Activo)
VALUES ($1, $2, $3, TRUE)
ON CONFLICT (organization_id, idPlanDeEstudios) DO UPDATE SET
Nombre = EXCLUDED.Nombre,
Activo = EXCLUDED.Activo,
updated_at = NOW()
"#
)
.bind(org_id)
.bind(plan.id_plan_de_estudios)
.bind(&plan.nombre_plan)
.execute(pool)
.await
.map_err(|e| format!("Failed to save SAM study plan mirror: {}", e))?;
sqlx::query(
r#"
INSERT INTO mysql_study_plans (mysql_id, organization_id, name, course_type)
@@ -129,6 +147,31 @@ pub async fn save_mysql_courses_and_plans(
let level_calculated = calculate_course_level(course.nivel_curso);
tracing::debug!("Saving course: {} (ID: {}, Plan ID: {})", course.nombre_curso, course.id_cursos, course.id_plan_de_estudios);
sqlx::query(
r#"
INSERT INTO sam_courses (
organization_id, idCursos, idPlanDeEstudios, NombreCurso, NivelCurso, Duracion, Activo
)
VALUES ($1, $2, $3, $4, $5, $6, TRUE)
ON CONFLICT (organization_id, idCursos) DO UPDATE SET
idPlanDeEstudios = EXCLUDED.idPlanDeEstudios,
NombreCurso = EXCLUDED.NombreCurso,
NivelCurso = EXCLUDED.NivelCurso,
Duracion = EXCLUDED.Duracion,
Activo = EXCLUDED.Activo,
updated_at = NOW()
"#,
)
.bind(org_id)
.bind(course.id_cursos)
.bind(course.id_plan_de_estudios)
.bind(&course.nombre_curso)
.bind(course.nivel_curso)
.bind(course.duracion)
.execute(pool)
.await
.map_err(|e| format!("Failed to save SAM course mirror: {}", e))?;
// Get study_plan_id from mysql_study_plans
let study_plan_id: i32 = sqlx::query_scalar(
"SELECT id FROM mysql_study_plans WHERE mysql_id = $1 AND organization_id = $2"
@@ -820,15 +863,15 @@ pub async fn get_mysql_plans(
Org(org_ctx): Org,
State(pool): State<PgPool>,
) -> Result<Json<Vec<MySqlPlanInfo>>, (StatusCode, String)> {
// Fetch all study plans from PostgreSQL
// Read from SAM mirror in PostgreSQL with SAM-native fields.
let plans: Vec<MySqlPlanInfo> = sqlx::query_as(
r#"
SELECT
mysql_id as id_plan_de_estudios,
name as nombre_plan
FROM mysql_study_plans
WHERE organization_id = $1 AND is_active = true
ORDER BY name
idPlanDeEstudios AS id_plan_de_estudios,
Nombre AS nombre_plan
FROM sam_study_plans
WHERE organization_id = $1 AND Activo = TRUE
ORDER BY Nombre
"#
)
.bind(org_ctx.id)
@@ -845,22 +888,25 @@ pub async fn get_mysql_courses_by_plan(
State(pool): State<PgPool>,
Query(filters): Query<MySqlCoursesFilters>,
) -> Result<Json<Vec<MySqlCourseInfo>>, (StatusCode, String)> {
// Fetch courses filtered by plan from PostgreSQL
// Read from SAM mirror in PostgreSQL with SAM-native fields.
let courses: Vec<MySqlCourseInfo> = sqlx::query_as(
r#"
SELECT
c.mysql_id as id_cursos,
c.name as nombre_curso,
c.level as nivel_curso,
sp.mysql_id as id_plan_de_estudios,
sp.name as nombre_plan,
c.duracion::double precision as duracion
FROM mysql_courses c
JOIN mysql_study_plans sp ON c.study_plan_id = sp.id
c.idCursos AS id_cursos,
c.NombreCurso AS nombre_curso,
c.NivelCurso AS nivel_curso,
c.idPlanDeEstudios AS id_plan_de_estudios,
p.Nombre AS nombre_plan,
c.Duracion AS duracion
FROM sam_courses c
JOIN sam_study_plans p
ON p.organization_id = c.organization_id
AND p.idPlanDeEstudios = c.idPlanDeEstudios
WHERE c.organization_id = $1
AND c.is_active = true
AND sp.mysql_id = $2
ORDER BY c.level
AND c.Activo = TRUE
AND p.Activo = TRUE
AND c.idPlanDeEstudios = $2
ORDER BY c.NivelCurso
"#
)
.bind(org_ctx.id)