feat: database-first refactor, unified architecture and visual developer manual
Summary of changes: - Consolidated Studio+CMS and Experience+LMS into unified services. - Moved core business logic (enrollment, grading, auth) to PostgreSQL functions. - Implemented advanced auditing via DB triggers and session context. - Added gamification (XP/Levels/Leaderboards) and logic encapsulation. - Updated installation/diagnostic scripts for the new architecture. - Created a comprehensive Visual Developer Manual in README.md with hardware scaling.
This commit is contained in:
@@ -0,0 +1,40 @@
|
||||
use sqlx::{Postgres, Transaction};
|
||||
|
||||
pub async fn set_session_context(
|
||||
tx: &mut Transaction<'_, Postgres>,
|
||||
user_id: Option<uuid::Uuid>,
|
||||
org_id: Option<uuid::Uuid>,
|
||||
ip: Option<String>,
|
||||
ua: Option<String>,
|
||||
event_type: Option<String>,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
if let Some(uid) = user_id {
|
||||
sqlx::query(&format!("SET LOCAL app.current_user_id = '{}'", uid))
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
}
|
||||
if let Some(oid) = org_id {
|
||||
sqlx::query(&format!("SET LOCAL app.current_org_id = '{}'", oid))
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
}
|
||||
if let Some(ip_addr) = ip {
|
||||
sqlx::query(&format!("SET LOCAL app.client_ip = '{}'", ip_addr))
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
}
|
||||
if let Some(user_agent) = ua {
|
||||
// Use set_config for potentially long strings to avoid SQL injection/formatting issues
|
||||
sqlx::query("SELECT set_config('app.user_agent', $1, true)")
|
||||
.bind(user_agent)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
}
|
||||
if let Some(et) = event_type {
|
||||
sqlx::query("SELECT set_config('app.event_type', $1, true)")
|
||||
.bind(et)
|
||||
.execute(&mut **tx)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -15,9 +15,10 @@ use sqlx::{PgPool, Row};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub async fn enroll_user(
|
||||
State(pool): State<PgPool>,
|
||||
Org(org_ctx): Org,
|
||||
claims: Claims,
|
||||
State(pool): State<PgPool>,
|
||||
headers: axum::http::HeaderMap,
|
||||
Json(payload): Json<serde_json::Value>,
|
||||
) -> Result<Json<Enrollment>, StatusCode> {
|
||||
let course_id_str = payload
|
||||
@@ -27,18 +28,61 @@ pub async fn enroll_user(
|
||||
let course_id = Uuid::parse_str(course_id_str).map_err(|_| StatusCode::BAD_REQUEST)?;
|
||||
let user_id = claims.sub;
|
||||
|
||||
let enrollment = sqlx::query_as::<_, Enrollment>(
|
||||
"INSERT INTO enrollments (user_id, course_id, organization_id) VALUES ($1, $2, $3) RETURNING *"
|
||||
let mut tx = pool
|
||||
.begin()
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
let ip = headers
|
||||
.get("x-forwarded-for")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.or_else(|| headers.get("x-real-ip").and_then(|h| h.to_str().ok()))
|
||||
.map(|s| s.to_string());
|
||||
|
||||
let ua = headers
|
||||
.get("user-agent")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
|
||||
crate::db_util::set_session_context(
|
||||
&mut tx,
|
||||
Some(user_id),
|
||||
Some(org_ctx.id),
|
||||
ip,
|
||||
ua,
|
||||
Some("USER_EVENT".to_string()),
|
||||
)
|
||||
.bind(user_id)
|
||||
.bind(course_id)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!("Enrollment failed: {}", e);
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
})?;
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
let enrollment = sqlx::query_as::<_, Enrollment>("SELECT * FROM fn_enroll_student($1, $2, $3)")
|
||||
.bind(org_ctx.id)
|
||||
.bind(user_id)
|
||||
.bind(course_id)
|
||||
.fetch_one(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!("Enrollment failed: {}", e);
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
})?;
|
||||
|
||||
tx.commit()
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
// Dispatch Webhook
|
||||
let webhook_service = common::webhooks::WebhookService::new(pool.clone());
|
||||
webhook_service
|
||||
.dispatch(
|
||||
org_ctx.id,
|
||||
"user.enrolled",
|
||||
&serde_json::json!({
|
||||
"user_id": user_id,
|
||||
"course_id": course_id,
|
||||
"enrollment_id": enrollment.id
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
Ok(Json(enrollment))
|
||||
}
|
||||
@@ -397,16 +441,45 @@ pub async fn get_user_enrollments(
|
||||
|
||||
pub async fn submit_lesson_score(
|
||||
Org(org_ctx): Org,
|
||||
claims: Claims,
|
||||
State(pool): State<PgPool>,
|
||||
headers: axum::http::HeaderMap,
|
||||
Json(payload): Json<GradeSubmissionPayload>,
|
||||
) -> Result<Json<common::models::UserGrade>, (StatusCode, String)> {
|
||||
let mut tx = pool
|
||||
.begin()
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
|
||||
let ip = headers
|
||||
.get("x-forwarded-for")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.or_else(|| headers.get("x-real-ip").and_then(|h| h.to_str().ok()))
|
||||
.map(|s| s.to_string());
|
||||
|
||||
let ua = headers
|
||||
.get("user-agent")
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
|
||||
crate::db_util::set_session_context(
|
||||
&mut tx,
|
||||
Some(claims.sub),
|
||||
Some(org_ctx.id),
|
||||
ip,
|
||||
ua,
|
||||
Some("SYSTEM_EVENT".to_string()),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
|
||||
// 1. Get lesson attempt rules
|
||||
let max_attempts: Option<Option<i32>> = sqlx::query_scalar(
|
||||
"SELECT max_attempts FROM lessons WHERE id = $1 AND organization_id = $2",
|
||||
)
|
||||
.bind(payload.lesson_id)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_optional(&pool)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
|
||||
@@ -420,19 +493,13 @@ pub async fn submit_lesson_score(
|
||||
.bind(payload.user_id)
|
||||
.bind(payload.lesson_id)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_optional(&pool)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
|
||||
if let Some(count) = existing_attempts {
|
||||
if let Some(max) = max_attempts {
|
||||
if count >= max {
|
||||
tracing::warn!(
|
||||
"User {} attempted to resubmit lesson {} but reached max_attempts ({})",
|
||||
payload.user_id,
|
||||
payload.lesson_id,
|
||||
max
|
||||
);
|
||||
return Err((
|
||||
StatusCode::FORBIDDEN,
|
||||
"Maximum attempts reached for this assessment".into(),
|
||||
@@ -441,68 +508,66 @@ pub async fn submit_lesson_score(
|
||||
}
|
||||
}
|
||||
|
||||
// 3. Upsert with increment
|
||||
// 3. Upsert with automated DB logic (XP, Badges)
|
||||
let grade = sqlx::query_as::<_, common::models::UserGrade>(
|
||||
"INSERT INTO user_grades (user_id, course_id, lesson_id, score, metadata, attempts_count, organization_id)
|
||||
VALUES ($1, $2, $3, $4, $5, 1, $6)
|
||||
ON CONFLICT (user_id, lesson_id) DO UPDATE SET
|
||||
score = EXCLUDED.score,
|
||||
metadata = EXCLUDED.metadata,
|
||||
attempts_count = user_grades.attempts_count + 1,
|
||||
created_at = CURRENT_TIMESTAMP
|
||||
RETURNING *"
|
||||
"SELECT * FROM fn_upsert_user_grade($1, $2, $3, $4, $5, $6)",
|
||||
)
|
||||
.bind(org_ctx.id)
|
||||
.bind(payload.user_id)
|
||||
.bind(payload.course_id)
|
||||
.bind(payload.lesson_id)
|
||||
.bind(payload.score)
|
||||
.bind(payload.metadata)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_one(&pool)
|
||||
.fetch_one(&mut *tx)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
|
||||
// 4. Grant Points
|
||||
let points_to_grant = 20; // Base points for any lesson
|
||||
let _ = sqlx::query(
|
||||
"INSERT INTO points_log (user_id, organization_id, amount, reason, entity_type, entity_id) VALUES ($1, $2, $3, $4, $5, $6)"
|
||||
tx.commit()
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
|
||||
// 4. Dispatch Webhooks
|
||||
let webhook_service = common::webhooks::WebhookService::new(pool.clone());
|
||||
|
||||
// lesson.completed
|
||||
webhook_service
|
||||
.dispatch(
|
||||
org_ctx.id,
|
||||
"lesson.completed",
|
||||
&serde_json::json!({
|
||||
"user_id": payload.user_id,
|
||||
"course_id": payload.course_id,
|
||||
"lesson_id": payload.lesson_id,
|
||||
"score": payload.score
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
// TODO: Detect course completion logic
|
||||
let total_lessons: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM lessons WHERE module_id IN (SELECT id FROM modules WHERE course_id = $1)")
|
||||
.bind(payload.course_id)
|
||||
.fetch_one(&pool).await.unwrap_or(0);
|
||||
|
||||
let completed_lessons: i64 = sqlx::query_scalar(
|
||||
"SELECT COUNT(*) FROM user_grades WHERE user_id = $1 AND course_id = $2",
|
||||
)
|
||||
.bind(payload.user_id)
|
||||
.bind(org_ctx.id)
|
||||
.bind(points_to_grant)
|
||||
.bind("lesson_completion")
|
||||
.bind("lesson")
|
||||
.bind(payload.lesson_id)
|
||||
.execute(&pool)
|
||||
.await;
|
||||
.bind(payload.course_id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap_or(0);
|
||||
|
||||
// 5. Check for new badges (Trigger-like logic in code)
|
||||
// For now, very simple: if they reached a points threshold
|
||||
let total_points: i64 =
|
||||
sqlx::query_scalar("SELECT COALESCE(SUM(amount), 0) FROM points_log WHERE user_id = $1")
|
||||
.bind(payload.user_id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap_or(0);
|
||||
|
||||
let eligible_badges = sqlx::query(
|
||||
"SELECT id FROM badges WHERE organization_id = $1 AND requirement_type = 'points' AND requirement_value <= $2 AND id NOT IN (SELECT badge_id FROM user_badges WHERE user_id = $3)"
|
||||
)
|
||||
.bind(org_ctx.id)
|
||||
.bind(total_points as i32)
|
||||
.bind(payload.user_id)
|
||||
.fetch_all(&pool)
|
||||
.await;
|
||||
|
||||
if let Ok(new_badges) = eligible_badges {
|
||||
for b in new_badges {
|
||||
let badge_id: Uuid = b.get("id");
|
||||
let _ = sqlx::query("INSERT INTO user_badges (user_id, badge_id) VALUES ($1, $2) ON CONFLICT DO NOTHING")
|
||||
.bind(payload.user_id)
|
||||
.bind(badge_id)
|
||||
.execute(&pool)
|
||||
.await;
|
||||
}
|
||||
if total_lessons > 0 && completed_lessons >= total_lessons {
|
||||
webhook_service
|
||||
.dispatch(
|
||||
org_ctx.id,
|
||||
"course.completed",
|
||||
&serde_json::json!({
|
||||
"user_id": payload.user_id,
|
||||
"course_id": payload.course_id
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(Json(grade))
|
||||
@@ -511,6 +576,7 @@ pub async fn submit_lesson_score(
|
||||
#[derive(serde::Serialize)]
|
||||
pub struct GamificationStatus {
|
||||
pub points: i64,
|
||||
pub level: i32,
|
||||
pub badges: Vec<BadgeResponse>,
|
||||
}
|
||||
|
||||
@@ -528,12 +594,11 @@ pub async fn get_user_gamification(
|
||||
State(pool): State<PgPool>,
|
||||
Path(user_id): Path<Uuid>,
|
||||
) -> Result<Json<GamificationStatus>, StatusCode> {
|
||||
let points: i64 =
|
||||
sqlx::query_scalar("SELECT COALESCE(SUM(amount), 0) FROM points_log WHERE user_id = $1")
|
||||
.bind(user_id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
let user_stats: (i32, i32) = sqlx::query_as("SELECT xp, level FROM users WHERE id = $1")
|
||||
.bind(user_id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
let badges = sqlx::query_as::<_, BadgeResponse>(
|
||||
"SELECT b.id, b.name, b.description, b.icon_url, ub.earned_at
|
||||
@@ -546,7 +611,42 @@ pub async fn get_user_gamification(
|
||||
.await
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
Ok(Json(GamificationStatus { points, badges }))
|
||||
Ok(Json(GamificationStatus {
|
||||
points: user_stats.0 as i64,
|
||||
level: user_stats.1,
|
||||
badges,
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn get_leaderboard(
|
||||
Org(org_ctx): Org,
|
||||
State(pool): State<PgPool>,
|
||||
) -> Result<Json<Vec<UserResponse>>, StatusCode> {
|
||||
let top_users = sqlx::query_as::<_, User>(
|
||||
"SELECT * FROM users WHERE organization_id = $1 ORDER BY xp DESC LIMIT 10",
|
||||
)
|
||||
.bind(org_ctx.id)
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!("Failed to fetch leaderboard: {}", e);
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
})?;
|
||||
|
||||
let response = top_users
|
||||
.into_iter()
|
||||
.map(|u| UserResponse {
|
||||
id: u.id,
|
||||
email: u.email,
|
||||
full_name: u.full_name,
|
||||
role: u.role,
|
||||
organization_id: u.organization_id,
|
||||
xp: u.xp,
|
||||
level: u.level,
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(Json(response))
|
||||
}
|
||||
|
||||
pub async fn get_user_course_grades(
|
||||
@@ -630,3 +730,38 @@ pub async fn get_course_analytics(
|
||||
lessons,
|
||||
}))
|
||||
}
|
||||
|
||||
pub async fn get_advanced_analytics(
|
||||
Org(_org_ctx): Org,
|
||||
State(pool): State<PgPool>,
|
||||
Path(course_id): Path<Uuid>,
|
||||
) -> Result<Json<common::models::AdvancedAnalytics>, StatusCode> {
|
||||
// 1. Cohort Analysis using DB function
|
||||
let cohort_data = sqlx::query_as::<_, common::models::CohortData>(
|
||||
"SELECT period, student_count as count, completion_rate FROM fn_get_cohort_analytics($1)",
|
||||
)
|
||||
.bind(course_id)
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!("Cohort query failed: {}", e);
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
})?;
|
||||
|
||||
// 2. Retention Analysis using DB function
|
||||
let retention_data = sqlx::query_as::<_, common::models::RetentionData>(
|
||||
"SELECT lesson_id, lesson_title, student_count FROM fn_get_retention_data($1)",
|
||||
)
|
||||
.bind(course_id)
|
||||
.fetch_all(&pool)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!("Retention query failed: {}", e);
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
})?;
|
||||
|
||||
Ok(Json(common::models::AdvancedAnalytics {
|
||||
cohorts: cohort_data,
|
||||
retention: retention_data,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
mod db_util;
|
||||
mod handlers;
|
||||
|
||||
use axum::{
|
||||
Router, middleware,
|
||||
routing::{get, post},
|
||||
Router,
|
||||
middleware,
|
||||
};
|
||||
use tower_http::cors::{Any, CorsLayer};
|
||||
use sqlx::postgres::PgPoolOptions;
|
||||
use std::net::SocketAddr;
|
||||
use dotenvy::dotenv;
|
||||
use sqlx::postgres::PgPoolOptions;
|
||||
use std::env;
|
||||
use std::net::SocketAddr;
|
||||
use tower_http::cors::{Any, CorsLayer};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
@@ -40,10 +40,26 @@ async fn main() {
|
||||
.route("/courses/{id}/outline", get(handlers::get_course_outline))
|
||||
.route("/lessons/{id}", get(handlers::get_lesson_content))
|
||||
.route("/grades", post(handlers::submit_lesson_score))
|
||||
.route("/users/{user_id}/courses/{course_id}/grades", get(handlers::get_user_course_grades))
|
||||
.route("/courses/{id}/analytics", get(handlers::get_course_analytics))
|
||||
.route("/users/{id}/gamification", get(handlers::get_user_gamification))
|
||||
.route_layer(middleware::from_fn(common::middleware::org_extractor_middleware));
|
||||
.route(
|
||||
"/users/{user_id}/courses/{course_id}/grades",
|
||||
get(handlers::get_user_course_grades),
|
||||
)
|
||||
.route(
|
||||
"/courses/{id}/analytics",
|
||||
get(handlers::get_course_analytics),
|
||||
)
|
||||
.route(
|
||||
"/courses/{id}/analytics/advanced",
|
||||
get(handlers::get_advanced_analytics),
|
||||
)
|
||||
.route(
|
||||
"/users/{id}/gamification",
|
||||
get(handlers::get_user_gamification),
|
||||
)
|
||||
.route("/analytics/leaderboard", get(handlers::get_leaderboard))
|
||||
.route_layer(middleware::from_fn(
|
||||
common::middleware::org_extractor_middleware,
|
||||
));
|
||||
|
||||
let public_routes = Router::new()
|
||||
.route("/catalog", get(handlers::get_course_catalog))
|
||||
|
||||
Reference in New Issue
Block a user