feat: Implement multi-tenancy with new database migrations, API updates across services, and refactor frontend API calls.
This commit is contained in:
@@ -3,8 +3,9 @@ use axum::{
|
||||
http::StatusCode,
|
||||
Json,
|
||||
};
|
||||
use common::models::{Course, Enrollment, Module, Lesson, User, UserResponse, AuthResponse, CourseAnalytics, LessonAnalytics};
|
||||
use common::auth::create_jwt;
|
||||
use common::models::{Course, Enrollment, Module, Lesson, User, UserResponse, AuthResponse, CourseAnalytics, LessonAnalytics, Organization};
|
||||
use common::auth::{create_jwt, Claims};
|
||||
use common::middleware::Org;
|
||||
use sqlx::{PgPool, Row};
|
||||
use uuid::Uuid;
|
||||
use serde::Deserialize;
|
||||
@@ -12,18 +13,20 @@ use bcrypt::{hash, verify, DEFAULT_COST};
|
||||
|
||||
pub async fn enroll_user(
|
||||
State(pool): State<PgPool>,
|
||||
Org(org_ctx): Org,
|
||||
claims: Claims,
|
||||
Json(payload): Json<serde_json::Value>,
|
||||
) -> Result<Json<Enrollment>, StatusCode> {
|
||||
let course_id_str = payload.get("course_id").and_then(|v| v.as_str()).ok_or(StatusCode::BAD_REQUEST)?;
|
||||
let course_id = Uuid::parse_str(course_id_str).map_err(|_| StatusCode::BAD_REQUEST)?;
|
||||
let user_id_str = payload.get("user_id").and_then(|v| v.as_str()).ok_or(StatusCode::BAD_REQUEST)?;
|
||||
let user_id = Uuid::parse_str(user_id_str).map_err(|_| StatusCode::BAD_REQUEST)?;
|
||||
let user_id = claims.sub;
|
||||
|
||||
let enrollment = sqlx::query_as::<_, Enrollment>(
|
||||
"INSERT INTO enrollments (user_id, course_id) VALUES ($1, $2) RETURNING *"
|
||||
"INSERT INTO enrollments (user_id, course_id, organization_id) VALUES ($1, $2, $3) RETURNING *"
|
||||
)
|
||||
.bind(user_id)
|
||||
.bind(course_id)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
@@ -39,6 +42,7 @@ pub struct AuthPayload {
|
||||
pub email: String,
|
||||
pub password: String,
|
||||
pub full_name: Option<String>,
|
||||
pub organization_name: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
@@ -59,17 +63,36 @@ pub async fn register(
|
||||
|
||||
let full_name = payload.full_name.unwrap_or_else(|| payload.email.split('@').next().unwrap_or("Student").to_string());
|
||||
|
||||
// Find or create organization
|
||||
let org_name = payload.organization_name.unwrap_or_else(|| {
|
||||
let parts: Vec<&str> = payload.email.split('@').collect();
|
||||
parts.get(1).unwrap_or(&"default.com").to_string()
|
||||
});
|
||||
|
||||
let mut tx = pool.begin().await.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
|
||||
let organization = sqlx::query_as::<_, Organization>(
|
||||
"INSERT INTO organizations (name) VALUES ($1) ON CONFLICT (name) DO UPDATE SET name = EXCLUDED.name RETURNING *"
|
||||
)
|
||||
.bind(&org_name)
|
||||
.fetch_one(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Failed to find or create organization: {}", e)))?;
|
||||
|
||||
let user = sqlx::query_as::<_, User>(
|
||||
"INSERT INTO users (email, password_hash, full_name) VALUES ($1, $2, $3) RETURNING *"
|
||||
"INSERT INTO users (email, password_hash, full_name, organization_id, role) VALUES ($1, $2, $3, $4, 'student') RETURNING *"
|
||||
)
|
||||
.bind(&payload.email)
|
||||
.bind(password_hash)
|
||||
.bind(full_name)
|
||||
.fetch_one(&pool)
|
||||
.bind(organization.id)
|
||||
.fetch_one(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::CONFLICT, format!("User already exists or DB error: {}", e)))?;
|
||||
|
||||
let token = create_jwt(user.id, "student")
|
||||
tx.commit().await.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
|
||||
let token = create_jwt(user.id, user.organization_id, "student")
|
||||
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "JWT generation failed".into()))?;
|
||||
|
||||
Ok(Json(AuthResponse {
|
||||
@@ -97,7 +120,7 @@ pub async fn login(
|
||||
return Err((StatusCode::UNAUTHORIZED, "Invalid credentials".into()));
|
||||
}
|
||||
|
||||
let token = create_jwt(user.id, "student")
|
||||
let token = create_jwt(user.id, user.organization_id, "student")
|
||||
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "JWT generation failed".into()))?;
|
||||
|
||||
Ok(Json(AuthResponse {
|
||||
@@ -129,9 +152,10 @@ pub async fn ingest_course(
|
||||
let mut tx = pool.begin().await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
// 1. Upsert Course
|
||||
let org_id = payload.course.organization_id;
|
||||
sqlx::query(
|
||||
"INSERT INTO courses (id, title, description, instructor_id, start_date, end_date, passing_percentage, certificate_template, updated_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
"INSERT INTO courses (id, title, description, instructor_id, start_date, end_date, passing_percentage, certificate_template, updated_at, organization_id)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||
ON CONFLICT (id) DO UPDATE SET
|
||||
title = EXCLUDED.title,
|
||||
description = EXCLUDED.description,
|
||||
@@ -140,7 +164,8 @@ pub async fn ingest_course(
|
||||
end_date = EXCLUDED.end_date,
|
||||
passing_percentage = EXCLUDED.passing_percentage,
|
||||
certificate_template = EXCLUDED.certificate_template,
|
||||
updated_at = EXCLUDED.updated_at"
|
||||
updated_at = EXCLUDED.updated_at,
|
||||
organization_id = EXCLUDED.organization_id"
|
||||
)
|
||||
.bind(payload.course.id)
|
||||
.bind(&payload.course.title)
|
||||
@@ -151,6 +176,7 @@ pub async fn ingest_course(
|
||||
.bind(payload.course.passing_percentage)
|
||||
.bind(&payload.course.certificate_template)
|
||||
.bind(payload.course.updated_at)
|
||||
.bind(org_id)
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
@@ -174,8 +200,8 @@ pub async fn ingest_course(
|
||||
// 3. Insert Grading Categories
|
||||
for cat in payload.grading_categories {
|
||||
sqlx::query(
|
||||
"INSERT INTO grading_categories (id, course_id, name, weight, drop_count, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)"
|
||||
"INSERT INTO grading_categories (id, course_id, name, weight, drop_count, created_at, organization_id)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)"
|
||||
)
|
||||
.bind(cat.id)
|
||||
.bind(payload.course.id)
|
||||
@@ -183,6 +209,7 @@ pub async fn ingest_course(
|
||||
.bind(cat.weight)
|
||||
.bind(cat.drop_count)
|
||||
.bind(cat.created_at)
|
||||
.bind(org_id)
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
@@ -191,22 +218,23 @@ pub async fn ingest_course(
|
||||
// 4. Insert Modules and Lessons
|
||||
for pub_module in payload.modules {
|
||||
sqlx::query(
|
||||
"INSERT INTO modules (id, course_id, title, position, created_at)
|
||||
VALUES ($1, $2, $3, $4, $5)"
|
||||
"INSERT INTO modules (id, course_id, title, position, created_at, organization_id)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)"
|
||||
)
|
||||
.bind(pub_module.module.id)
|
||||
.bind(payload.course.id)
|
||||
.bind(&pub_module.module.title)
|
||||
.bind(pub_module.module.position)
|
||||
.bind(pub_module.module.created_at)
|
||||
.bind(org_id)
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
for lesson in pub_module.lessons {
|
||||
sqlx::query(
|
||||
"INSERT INTO lessons (id, module_id, title, content_type, content_url, transcription, metadata, position, created_at, is_graded, grading_category_id, max_attempts, allow_retry)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)"
|
||||
"INSERT INTO lessons (id, module_id, title, content_type, content_url, transcription, metadata, position, created_at, is_graded, grading_category_id, max_attempts, allow_retry, organization_id)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)"
|
||||
)
|
||||
.bind(lesson.id)
|
||||
.bind(pub_module.module.id)
|
||||
@@ -221,6 +249,7 @@ pub async fn ingest_course(
|
||||
.bind(lesson.grading_category_id)
|
||||
.bind(lesson.max_attempts)
|
||||
.bind(lesson.allow_retry)
|
||||
.bind(org_id)
|
||||
.execute(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
@@ -236,19 +265,22 @@ pub async fn ingest_course(
|
||||
}
|
||||
|
||||
pub async fn get_course_outline(
|
||||
Org(org_ctx): Org,
|
||||
State(pool): State<PgPool>,
|
||||
Path(id): Path<Uuid>,
|
||||
) -> Result<Json<common::models::PublishedCourse>, StatusCode> {
|
||||
// 1. Fetch Course
|
||||
let course = sqlx::query_as::<_, Course>("SELECT * FROM courses WHERE id = $1")
|
||||
let course = sqlx::query_as::<_, Course>("SELECT * FROM courses WHERE id = $1 AND organization_id = $2")
|
||||
.bind(id)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.map_err(|_| StatusCode::NOT_FOUND)?;
|
||||
|
||||
// 2. Fetch Modules
|
||||
let modules = sqlx::query_as::<_, Module>("SELECT * FROM modules WHERE course_id = $1 ORDER BY position")
|
||||
let modules = sqlx::query_as::<_, Module>("SELECT * FROM modules WHERE course_id = $1 AND organization_id = $2 ORDER BY position")
|
||||
.bind(id)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
@@ -258,6 +290,7 @@ pub async fn get_course_outline(
|
||||
"SELECT * FROM grading_categories WHERE course_id = $1 ORDER BY created_at"
|
||||
)
|
||||
.bind(id)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
@@ -265,8 +298,9 @@ pub async fn get_course_outline(
|
||||
// 4. Fetch Lessons
|
||||
let mut pub_modules = Vec::new();
|
||||
for module in modules {
|
||||
let lessons = sqlx::query_as::<_, Lesson>("SELECT * FROM lessons WHERE module_id = $1 ORDER BY position")
|
||||
let lessons = sqlx::query_as::<_, Lesson>("SELECT * FROM lessons WHERE module_id = $1 AND organization_id = $2 ORDER BY position")
|
||||
.bind(module.id)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
@@ -285,11 +319,13 @@ pub async fn get_course_outline(
|
||||
}
|
||||
|
||||
pub async fn get_lesson_content(
|
||||
Org(org_ctx): Org,
|
||||
State(pool): State<PgPool>,
|
||||
Path(id): Path<Uuid>,
|
||||
) -> Result<Json<Lesson>, StatusCode> {
|
||||
let lesson = sqlx::query_as::<_, Lesson>("SELECT * FROM lessons WHERE id = $1")
|
||||
let lesson = sqlx::query_as::<_, Lesson>("SELECT * FROM lessons WHERE id = $1 AND organization_id = $2")
|
||||
.bind(id)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.map_err(|_| StatusCode::NOT_FOUND)?;
|
||||
@@ -298,11 +334,13 @@ pub async fn get_lesson_content(
|
||||
}
|
||||
|
||||
pub async fn get_user_enrollments(
|
||||
Org(org_ctx): Org,
|
||||
State(pool): State<PgPool>,
|
||||
Path(user_id): Path<Uuid>,
|
||||
) -> Result<Json<Vec<Enrollment>>, StatusCode> {
|
||||
let enrollments = sqlx::query_as::<_, Enrollment>("SELECT * FROM enrollments WHERE user_id = $1")
|
||||
let enrollments = sqlx::query_as::<_, Enrollment>("SELECT * FROM enrollments WHERE user_id = $1 AND organization_id = $2")
|
||||
.bind(user_id)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
@@ -311,12 +349,14 @@ pub async fn get_user_enrollments(
|
||||
}
|
||||
|
||||
pub async fn submit_lesson_score(
|
||||
Org(org_ctx): Org,
|
||||
State(pool): State<PgPool>,
|
||||
Json(payload): Json<GradeSubmissionPayload>,
|
||||
) -> Result<Json<common::models::UserGrade>, (StatusCode, String)> {
|
||||
// 1. Get lesson attempt rules
|
||||
let max_attempts: Option<Option<i32>> = sqlx::query_scalar("SELECT max_attempts FROM lessons WHERE id = $1")
|
||||
let max_attempts: Option<Option<i32>> = sqlx::query_scalar("SELECT max_attempts FROM lessons WHERE id = $1 AND organization_id = $2")
|
||||
.bind(payload.lesson_id)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_optional(&pool)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
@@ -327,9 +367,10 @@ pub async fn submit_lesson_score(
|
||||
let max_attempts = max_attempts.flatten();
|
||||
|
||||
// 2. Check existing grade/attempts
|
||||
let existing_attempts: Option<i32> = sqlx::query_scalar("SELECT attempts_count FROM user_grades WHERE user_id = $1 AND lesson_id = $2")
|
||||
let existing_attempts: Option<i32> = sqlx::query_scalar("SELECT attempts_count FROM user_grades WHERE user_id = $1 AND lesson_id = $2 AND organization_id = $3")
|
||||
.bind(payload.user_id)
|
||||
.bind(payload.lesson_id)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_optional(&pool)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
@@ -345,8 +386,8 @@ pub async fn submit_lesson_score(
|
||||
|
||||
// 3. Upsert with increment
|
||||
let grade = sqlx::query_as::<_, common::models::UserGrade>(
|
||||
"INSERT INTO user_grades (user_id, course_id, lesson_id, score, metadata, attempts_count)
|
||||
VALUES ($1, $2, $3, $4, $5, 1)
|
||||
"INSERT INTO user_grades (user_id, course_id, lesson_id, score, metadata, attempts_count, organization_id)
|
||||
VALUES ($1, $2, $3, $4, $5, 1, $6)
|
||||
ON CONFLICT (user_id, lesson_id) DO UPDATE SET
|
||||
score = EXCLUDED.score,
|
||||
metadata = EXCLUDED.metadata,
|
||||
@@ -359,6 +400,7 @@ pub async fn submit_lesson_score(
|
||||
.bind(payload.lesson_id)
|
||||
.bind(payload.score)
|
||||
.bind(payload.metadata)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
@@ -367,14 +409,16 @@ pub async fn submit_lesson_score(
|
||||
}
|
||||
|
||||
pub async fn get_user_course_grades(
|
||||
Org(org_ctx): Org,
|
||||
State(pool): State<PgPool>,
|
||||
Path((user_id, course_id)): Path<(Uuid, Uuid)>,
|
||||
) -> Result<Json<Vec<common::models::UserGrade>>, StatusCode> {
|
||||
let grades = sqlx::query_as::<_, common::models::UserGrade>(
|
||||
"SELECT * FROM user_grades WHERE user_id = $1 AND course_id = $2"
|
||||
"SELECT * FROM user_grades WHERE user_id = $1 AND course_id = $2 AND organization_id = $3"
|
||||
)
|
||||
.bind(user_id)
|
||||
.bind(course_id)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
@@ -382,19 +426,22 @@ pub async fn get_user_course_grades(
|
||||
Ok(Json(grades))
|
||||
}
|
||||
pub async fn get_course_analytics(
|
||||
Org(org_ctx): Org,
|
||||
State(pool): State<PgPool>,
|
||||
Path(course_id): Path<Uuid>,
|
||||
) -> Result<Json<CourseAnalytics>, (StatusCode, String)> {
|
||||
// 1. Total Enrollments
|
||||
let total_enrollments: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM enrollments WHERE course_id = $1")
|
||||
let total_enrollments: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM enrollments WHERE course_id = $1 AND organization_id = $2")
|
||||
.bind(course_id)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
|
||||
// 2. Average Course Score (Overall)
|
||||
let average_score: Option<f32> = sqlx::query_scalar("SELECT AVG(score)::float4 FROM user_grades WHERE course_id = $1")
|
||||
let average_score: Option<f32> = sqlx::query_scalar("SELECT AVG(score)::float4 FROM user_grades WHERE course_id = $1 AND organization_id = $2")
|
||||
.bind(course_id)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
@@ -410,12 +457,13 @@ pub async fn get_course_analytics(
|
||||
COUNT(g.id) as submission_count
|
||||
FROM lessons l
|
||||
LEFT JOIN user_grades g ON l.id = g.lesson_id
|
||||
WHERE l.module_id IN (SELECT id FROM modules WHERE course_id = $1)
|
||||
WHERE l.module_id IN (SELECT id FROM modules WHERE course_id = $1) AND l.organization_id = $2
|
||||
GROUP BY l.id, l.title, l.position
|
||||
ORDER BY l.position
|
||||
"#
|
||||
)
|
||||
.bind(course_id)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
mod handlers;
|
||||
|
||||
use axum::{
|
||||
routing::{get, post},
|
||||
routing::{get, post, put},
|
||||
Router,
|
||||
middleware,
|
||||
};
|
||||
use tower_http::cors::{Any, CorsLayer};
|
||||
use sqlx::postgres::PgPoolOptions;
|
||||
@@ -33,23 +34,27 @@ async fn main() {
|
||||
.allow_methods(Any)
|
||||
.allow_headers(Any);
|
||||
|
||||
let app = Router::new()
|
||||
.route("/catalog", get(handlers::get_course_catalog))
|
||||
let protected_routes = Router::new()
|
||||
.route("/enroll", post(handlers::enroll_user))
|
||||
.route("/ingest", post(handlers::ingest_course))
|
||||
.route("/auth/register", post(handlers::register))
|
||||
.route("/auth/login", post(handlers::login))
|
||||
.route("/enrollments/{id}", get(handlers::get_user_enrollments))
|
||||
.route("/enrollments/:id", get(handlers::get_user_enrollments))
|
||||
.route("/courses/{id}/outline", get(handlers::get_course_outline))
|
||||
.route("/lessons/{id}", get(handlers::get_lesson_content))
|
||||
.route("/lessons/:id", get(handlers::get_lesson_content))
|
||||
.route("/grades", post(handlers::submit_lesson_score))
|
||||
.route("/users/{user_id}/courses/{course_id}/grades", get(handlers::get_user_course_grades))
|
||||
.route("/courses/{id}/analytics", get(handlers::get_course_analytics))
|
||||
.route_layer(middleware::from_fn(common::middleware::org_extractor_middleware));
|
||||
|
||||
let public_routes = Router::new()
|
||||
.route("/catalog", get(handlers::get_course_catalog))
|
||||
.route("/ingest", post(handlers::ingest_course))
|
||||
.route("/auth/register", post(handlers::register))
|
||||
.route("/auth/login", post(handlers::login))
|
||||
.merge(protected_routes)
|
||||
.layer(cors)
|
||||
.with_state(pool);
|
||||
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], 3002));
|
||||
tracing::info!("LMS Service listening on {}", addr);
|
||||
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
axum::serve(listener, public_routes).await.unwrap();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user