feat: Introduce multi-tenancy support with organization-specific data, add interactive transcript functionality, and enhance lesson/course schemas.

This commit is contained in:
2026-01-15 18:02:04 -03:00
parent daeda7e905
commit 663950aa0e
26 changed files with 933 additions and 302 deletions
@@ -0,0 +1,101 @@
-- Migration: Make user_id nullable in audit_logs
-- This allows logging system actions and registrations where no session user exists yet.
ALTER TABLE audit_logs ALTER COLUMN user_id DROP NOT NULL;
-- Also update the trigger to use the new user's ID as the actor for registrations if no session user is set
CREATE OR REPLACE FUNCTION fn_trigger_audit_log()
RETURNS TRIGGER AS $$
DECLARE
v_user_id UUID;
v_org_id UUID;
v_ip INET;
v_user_agent TEXT;
v_event_type VARCHAR(50);
v_old_data JSONB := NULL;
v_new_data JSONB := NULL;
v_action VARCHAR(50);
BEGIN
-- Try to get context from session variables
BEGIN
v_user_id := current_setting('app.current_user_id', true)::UUID;
EXCEPTION WHEN OTHERS THEN
v_user_id := NULL;
END;
BEGIN
v_org_id := current_setting('app.current_org_id', true)::UUID;
EXCEPTION WHEN OTHERS THEN
v_org_id := NULL;
END;
BEGIN
v_ip := current_setting('app.client_ip', true)::INET;
EXCEPTION WHEN OTHERS THEN
v_ip := NULL;
END;
BEGIN
v_user_agent := current_setting('app.user_agent', true);
EXCEPTION WHEN OTHERS THEN
v_user_agent := NULL;
END;
BEGIN
v_event_type := current_setting('app.event_type', true);
EXCEPTION WHEN OTHERS THEN
v_event_type := 'USER_EVENT';
END;
-- Handle different operations
IF (TG_OP = 'DELETE') THEN
v_old_data := to_jsonb(OLD);
v_action := 'DELETE';
ELSIF (TG_OP = 'UPDATE') THEN
v_old_data := to_jsonb(OLD);
v_new_data := to_jsonb(NEW);
v_action := 'UPDATE';
ELSIF (TG_OP = 'INSERT') THEN
v_new_data := to_jsonb(NEW);
v_action := 'INSERT';
-- Special case: For user registration, use the new ID if no session user is set
IF TG_TABLE_NAME = 'users' AND v_user_id IS NULL THEN
v_user_id := NEW.id;
END IF;
END IF;
-- Insert into audit_logs
INSERT INTO audit_logs (
organization_id,
user_id,
action,
entity_type,
entity_id,
event_type,
old_data,
new_data,
ip_address,
user_agent,
changes
)
VALUES (
COALESCE(v_org_id, (CASE WHEN TG_OP = 'DELETE' THEN OLD.organization_id ELSE NEW.organization_id END)),
v_user_id,
v_action,
TG_TABLE_NAME,
CASE WHEN TG_OP = 'DELETE' THEN OLD.id ELSE NEW.id END,
COALESCE(v_event_type, 'USER_EVENT'),
v_old_data,
v_new_data,
v_ip,
v_user_agent,
COALESCE(v_new_data, v_old_data)
);
IF (TG_OP = 'DELETE') THEN
RETURN OLD;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
@@ -0,0 +1,5 @@
-- Add transcription_status to lessons table
ALTER TABLE lessons ADD COLUMN IF NOT EXISTS transcription_status VARCHAR(20) DEFAULT 'idle';
-- Optional: Update existing lessons with transcriptions to 'completed'
UPDATE lessons SET transcription_status = 'completed' WHERE transcription IS NOT NULL AND transcription != '{}'::jsonb;
+98 -48
View File
@@ -51,7 +51,16 @@ pub async fn publish_course(
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
// 4. Fetch Lessons for each Module
// 4. Fetch Organization
let organization = sqlx::query_as::<_, common::models::Organization>(
"SELECT * FROM organizations WHERE id = $1",
)
.bind(org_ctx.id)
.fetch_one(&pool)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
// 5. Fetch Lessons for each Module
for module in modules {
let lessons = sqlx::query_as::<_, Lesson>(
"SELECT * FROM lessons WHERE module_id = $1 ORDER BY position",
@@ -66,15 +75,16 @@ pub async fn publish_course(
let payload = PublishedCourse {
course,
organization,
grading_categories,
modules: pub_modules,
};
// 4. Send to LMS
// Using service name for Docker compatibility
let lms_url = env::var("LMS_INTERNAL_URL").unwrap_or_else(|_| "http://experience:3002".to_string());
let client = reqwest::Client::new();
let res = client
.post("http://lms-service:3002/ingest")
.post(format!("{}/ingest", lms_url))
.json(&payload)
.send()
.await
@@ -105,7 +115,7 @@ pub async fn publish_course(
)
.await;
Ok(StatusCode::OK)
Ok(StatusCode::NO_CONTENT)
}
#[derive(Deserialize)]
@@ -489,6 +499,7 @@ pub async fn process_transcription(
State(pool): State<PgPool>,
Path(id): Path<Uuid>,
) -> Result<Json<Lesson>, StatusCode> {
tracing::info!("Received transcription request for lesson: {}", id);
// 1. Fetch lesson
let lesson = sqlx::query_as::<_, Lesson>("SELECT * FROM lessons WHERE id = $1 AND organization_id = $2")
.bind(id)
@@ -508,13 +519,63 @@ pub async fn process_transcription(
let filename = url.trim_start_matches("/assets/");
let file_path = format!("uploads/{}", filename);
// 2. Read file
let file_data = tokio::fs::read(&file_path).await.map_err(|e| {
tracing::error!("File read failed ({}): {}", file_path, e);
// 2. Read file to verify it exists
if !tokio::fs::metadata(&file_path).await.is_ok() {
tracing::error!("File not found: {}", file_path);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
// 3. Set status to queued
let updated_lesson = sqlx::query_as::<_, Lesson>(
"UPDATE lessons SET transcription_status = 'queued' WHERE id = $1 RETURNING *",
)
.bind(id)
.fetch_one(&pool)
.await
.map_err(|e| {
tracing::error!("Database update failed (queued): {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
// 3. Configuration
log_action(
&pool,
org_ctx.id,
claims.sub,
"TRANSCRIPTION_QUEUED",
"Lesson",
id,
json!({ "status": "queued" }),
)
.await;
Ok(Json(updated_lesson))
}
pub async fn run_transcription_task(pool: PgPool, lesson_id: Uuid) -> Result<(), String> {
// 1. Fetch lesson
let lesson = sqlx::query_as::<_, Lesson>("SELECT * FROM lessons WHERE id = $1")
.bind(lesson_id)
.fetch_one(&pool)
.await
.map_err(|e| format!("Lesson fetch failed: {}", e))?;
let url = lesson.content_url.ok_or("No content URL")?;
let filename = url.trim_start_matches("/assets/");
let file_path = format!("uploads/{}", filename);
// 2. Set status to processing
sqlx::query("UPDATE lessons SET transcription_status = 'processing' WHERE id = $1")
.bind(lesson_id)
.execute(&pool)
.await
.map_err(|e| format!("Update to processing failed: {}", e))?;
// 3. Read file
let file_data = tokio::fs::read(&file_path)
.await
.map_err(|e| format!("File read failed ({}): {}", file_path, e))?;
// 4. Configuration
let provider = env::var("AI_PROVIDER").unwrap_or_else(|_| "openai".to_string());
let client = reqwest::Client::new();
@@ -527,7 +588,7 @@ pub async fn process_transcription(
"medium".to_string(),
)
} else {
let api_key = env::var("OPENAI_API_KEY").map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let api_key = env::var("OPENAI_API_KEY").map_err(|_| "Missing OPENAI_API_KEY")?;
(
"https://api.openai.com/v1/audio/transcriptions".to_string(),
format!("Bearer {}", api_key),
@@ -538,7 +599,7 @@ pub async fn process_transcription(
let part = reqwest::multipart::Part::bytes(file_data)
.file_name(filename.to_string())
.mime_str("application/octet-stream")
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
.map_err(|e| format!("Multipart part creation failed: {}", e))?;
let form = reqwest::multipart::Form::new()
.part("file", part)
@@ -550,21 +611,20 @@ pub async fn process_transcription(
request = request.header("Authorization", auth_header);
}
let response = request.send().await.map_err(|e| {
tracing::error!("Transcription request failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let response = request
.send()
.await
.map_err(|e| format!("Transcription request failed: {}", e))?;
if !response.status().is_success() {
let err_body = response.text().await.unwrap_or_default();
tracing::error!("Transcription API error: {}", err_body);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
return Err(format!("Transcription API error: {}", err_body));
}
let whisper_data: serde_json::Value = response.json().await.map_err(|e| {
tracing::error!("Whisper JSON parse failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let whisper_data: serde_json::Value = response
.json()
.await
.map_err(|e| format!("Whisper JSON parse failed: {}", e))?;
// Extract text and segments (cues)
let text = whisper_data["text"].as_str().unwrap_or_default();
@@ -583,35 +643,19 @@ pub async fn process_transcription(
let transcription = json!({
"en": text,
"es": "", // Could add a translation step here
"es": "",
"cues": cues
});
// 4. Update lesson
let updated_lesson = sqlx::query_as::<_, Lesson>(
"UPDATE lessons SET transcription = $1 WHERE id = $2 RETURNING *",
)
.bind(transcription)
.bind(id)
.fetch_one(&pool)
.await
.map_err(|e| {
tracing::error!("Database update failed: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
// 5. Update lesson
sqlx::query("UPDATE lessons SET transcription = $1, transcription_status = 'completed' WHERE id = $2")
.bind(transcription)
.bind(lesson_id)
.execute(&pool)
.await
.map_err(|e| format!("Final database update failed: {}", e))?;
log_action(
&pool,
org_ctx.id,
claims.sub,
"TRANSCRIPTION_PROCESSED",
"Lesson",
id,
json!({}),
)
.await;
Ok(Json(updated_lesson))
Ok(())
}
pub async fn summarize_lesson(
@@ -620,6 +664,7 @@ pub async fn summarize_lesson(
State(pool): State<PgPool>,
Path(id): Path<Uuid>,
) -> Result<Json<Lesson>, StatusCode> {
tracing::info!("Received summarization request for lesson: {}", id);
// 1. Fetch lesson
let lesson = sqlx::query_as::<_, Lesson>("SELECT * FROM lessons WHERE id = $1 AND organization_id = $2")
.bind(id)
@@ -731,6 +776,7 @@ pub async fn generate_quiz(
State(pool): State<PgPool>,
Path(id): Path<Uuid>,
) -> Result<Json<serde_json::Value>, StatusCode> {
tracing::info!("Received quiz generation request for lesson: {}", id);
// 1. Fetch lesson
let lesson = sqlx::query_as::<_, Lesson>("SELECT * FROM lessons WHERE id = $1 AND organization_id = $2")
.bind(id)
@@ -1229,6 +1275,7 @@ pub async fn upload_asset(
State(pool): State<PgPool>,
mut multipart: axum::extract::Multipart,
) -> Result<Json<UploadResponse>, (StatusCode, String)> {
tracing::info!("Starting upload_asset for org: {}", org_ctx.id);
let mut filename = String::new();
let mut data = Vec::new();
let mut mimetype = String::new();
@@ -1302,6 +1349,7 @@ pub async fn upload_asset(
let url = format!("/assets/{}", storage_filename);
tracing::info!("Upload successful: {} -> {}", filename, url);
Ok(Json(UploadResponse {
id: asset_id,
filename,
@@ -1451,8 +1499,9 @@ pub async fn get_course_analytics(
// 4. Fetch from LMS
let client = reqwest::Client::new();
let lms_url = env::var("LMS_INTERNAL_URL").unwrap_or_else(|_| "http://experience:3002".to_string());
let res = client
.get(format!("http://lms-service:3002/courses/{}/analytics", id))
.get(format!("{}/courses/{}/analytics", lms_url, id))
.send()
.await
.map_err(|e| (StatusCode::BAD_GATEWAY, e.to_string()))?;
@@ -1497,10 +1546,11 @@ pub async fn get_advanced_analytics(
// 4. Fetch from LMS
let client = reqwest::Client::new();
let lms_url = env::var("LMS_INTERNAL_URL").unwrap_or_else(|_| "http://experience:3002".to_string());
let res = client
.get(format!(
"http://lms-service:3002/courses/{}/analytics/advanced",
id
"{}/courses/{}/analytics/advanced",
lms_url, id
))
.send()
.await
@@ -119,6 +119,7 @@ pub async fn upload_organization_logo(
log_action(
&pool,
claims.org,
claims.sub,
"UPDATE_LOGO",
"Organization",
@@ -197,6 +198,7 @@ pub async fn update_organization_branding(
log_action(
&pool,
claims.org,
claims.sub,
"UPDATE_BRANDING",
"Organization",
@@ -1,3 +1,14 @@
use axum::{
Json,
extract::{Path, State},
http::StatusCode,
};
use common::models::Module;
use serde_json::json;
use sqlx::PgPool;
use uuid::Uuid;
use crate::handlers::log_action;
pub async fn update_module(
claims: common::auth::Claims,
@@ -24,7 +35,7 @@ pub async fn update_module(
StatusCode::INTERNAL_SERVER_ERROR
})?;
log_action(&pool, claims.sub, "UPDATE", "Module", id, json!(payload)).await;
log_action(&pool, claims.org, claims.sub, "UPDATE", "Module", id, json!(payload)).await;
Ok(Json(updated_module))
}
+40 -1
View File
@@ -6,11 +6,13 @@ mod webhooks;
use axum::{
Router, middleware,
routing::{delete, get, post},
extract::DefaultBodyLimit,
};
use dotenvy::dotenv;
use sqlx::postgres::PgPoolOptions;
use std::env;
use std::net::SocketAddr;
use std::time::Duration;
use tower_http::cors::{Any, CorsLayer};
#[tokio::main]
@@ -25,12 +27,48 @@ async fn main() {
.await
.expect("Failed to connect to database");
// Run migrations automatically
sqlx::migrate!("./migrations")
.run(&pool)
.await
.expect("Failed to run migrations");
// Start AI Background Worker
let worker_pool = pool.clone();
tokio::spawn(async move {
tracing::info!("AI Background Worker started");
loop {
// Check for queued transcriptions
let queued_lessons: Vec<sqlx::types::Uuid> = match sqlx::query_scalar(
"SELECT id FROM lessons WHERE transcription_status = 'queued' LIMIT 5"
)
.fetch_all(&worker_pool)
.await
{
Ok(ids) => ids,
Err(e) => {
tracing::error!("Failed to fetch queued lessons: {}", e);
tokio::time::sleep(Duration::from_secs(10)).await;
continue;
}
};
for lesson_id in queued_lessons {
tracing::info!("Processing transcription for lesson: {}", lesson_id);
if let Err(e) = handlers::run_transcription_task(worker_pool.clone(), lesson_id).await {
tracing::error!("Transcription task failed for lesson {}: {}", lesson_id, e);
let _ = sqlx::query(
"UPDATE lessons SET transcription_status = 'failed' WHERE id = $1"
)
.bind(lesson_id)
.execute(&worker_pool)
.await;
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
});
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
@@ -92,6 +130,7 @@ async fn main() {
.route("/users/{id}", axum::routing::put(handlers::update_user))
.route("/audit-logs", get(handlers::get_audit_logs))
.route("/assets/upload", post(handlers::upload_asset))
.layer(DefaultBodyLimit::disable())
.route(
"/organizations",
get(handlers::get_organizations).post(handlers::create_organization),