Refactor audio handling and S3 integration in LMS service

- Removed company-specific template rules from template application logic.
- Enhanced question generation queries to support both 'imported-mysql' and 'imported-material' sources.
- Introduced S3 audio storage functionality, including client setup and audio key generation.
- Updated audio response evaluation to store audio files in S3 or fallback to DB.
- Added new API routes for asset ingestion and ZIP import in CMS service.
- Implemented role-based access control for audio responses in LMS service.
- Created a smoke test script for validating audio roles and permissions.
- Updated frontend to support course selection in audio evaluations.
This commit is contained in:
2026-04-06 09:11:56 -04:00
parent 4afccb89ef
commit 516a903497
12 changed files with 2476 additions and 166 deletions
+3
View File
@@ -26,3 +26,6 @@ base64 = "0.22"
utoipa.workspace = true
thiserror.workspace = true
http.workspace = true
mime_guess = "2.0"
aws-config = "1"
aws-sdk-s3 = "1"
+346 -29
View File
@@ -5,6 +5,12 @@ use axum::{
response::IntoResponse,
Extension,
};
use aws_config::BehaviorVersion;
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_s3::{
Client as S3Client,
config::{Credentials, Region},
};
use bcrypt::{DEFAULT_COST, hash, verify};
use chrono::{DateTime, Utc};
use common::auth::{Claims, create_jwt};
@@ -88,6 +94,103 @@ use sqlx::{PgPool, Row};
use std::env;
use uuid::Uuid;
#[derive(Debug, Clone)]
struct S3AudioSettings {
bucket: String,
region: String,
endpoint: Option<String>,
public_base_url: Option<String>,
force_path_style: bool,
}
fn get_s3_audio_settings() -> Option<S3AudioSettings> {
let storage_mode = env::var("ASSETS_STORAGE")
.unwrap_or_else(|_| "local".to_string())
.to_lowercase();
if storage_mode != "s3" {
return None;
}
let bucket = env::var("S3_BUCKET").ok()?;
let region = env::var("S3_REGION").unwrap_or_else(|_| "us-east-2".to_string());
let endpoint = env::var("S3_ENDPOINT").ok().filter(|v| !v.trim().is_empty());
let public_base_url = env::var("S3_PUBLIC_BASE_URL")
.ok()
.filter(|v| !v.trim().is_empty());
let force_path_style = env::var("S3_FORCE_PATH_STYLE")
.map(|v| {
let lv = v.to_lowercase();
lv == "1" || lv == "true" || lv == "yes"
})
.unwrap_or(false);
Some(S3AudioSettings {
bucket,
region,
endpoint,
public_base_url,
force_path_style,
})
}
async fn build_s3_audio_client(settings: &S3AudioSettings) -> Result<S3Client, String> {
let region_provider =
RegionProviderChain::first_try(Some(Region::new(settings.region.clone()))).or_default_provider();
let mut loader = aws_config::defaults(BehaviorVersion::latest()).region(region_provider);
let access_key = env::var("AWS_ACCESS_KEY_ID").ok();
let secret_key = env::var("AWS_SECRET_ACCESS_KEY").ok();
if let (Some(ak), Some(sk)) = (access_key, secret_key) {
let creds = Credentials::new(ak, sk, None, None, "env");
loader = loader.credentials_provider(creds);
}
let shared = loader.load().await;
let mut builder = aws_sdk_s3::config::Builder::from(&shared);
if let Some(endpoint) = &settings.endpoint {
builder = builder.endpoint_url(endpoint);
}
if settings.force_path_style {
builder = builder.force_path_style(true);
}
Ok(S3Client::from_conf(builder.build()))
}
fn build_s3_audio_key(
org_id: Uuid,
course_id: Uuid,
lesson_id: Uuid,
user_id: Uuid,
response_id: Uuid,
extension: &str,
) -> String {
let ext = if extension.is_empty() { "webm" } else { extension };
format!(
"org/{}/course/{}/lesson/{}/audio-responses/{}/{}.{}",
org_id, course_id, lesson_id, user_id, response_id, ext
)
}
fn build_s3_audio_public_url(settings: &S3AudioSettings, key: &str) -> String {
if let Some(base) = &settings.public_base_url {
return format!("{}/{}", base.trim_end_matches('/'), key);
}
format!(
"https://{}.s3.{}.amazonaws.com/{}",
settings.bucket, settings.region, key
)
}
fn parse_s3_url(url: &str) -> Option<(String, String)> {
if let Some(without) = url.strip_prefix("s3://") {
let (bucket, key) = without.split_once('/')?;
return Some((bucket.to_string(), key.to_string()));
}
None
}
fn get_ai_url(var_base: &str, default: &str) -> String {
let env = env::var("ENVIRONMENT").unwrap_or_else(|_| "prod".to_string());
if env == "dev" {
@@ -2270,7 +2373,7 @@ pub async fn evaluate_audio_file(
let form = reqwest::multipart::Form::new()
.part(
"file",
reqwest::multipart::Part::bytes(audio_data.clone()).file_name(filename),
reqwest::multipart::Part::bytes(audio_data.clone()).file_name(filename.clone()),
)
.text("model", "whisper-1")
.text("response_format", "json");
@@ -2401,6 +2504,7 @@ pub async fn evaluate_audio_file(
// 3. Save audio response to database
// Determine status based on evaluation
let status = "ai_evaluated";
let response_id = Uuid::new_v4();
// Get attempt number (check if there's a previous response for this block)
let attempt_number: i32 = sqlx::query_scalar(
@@ -2413,16 +2517,73 @@ pub async fn evaluate_audio_file(
.await
.unwrap_or(1);
// Store audio as base64 for now (can be moved to object storage later)
let audio_base64 = base64::engine::general_purpose::STANDARD.encode(&audio_data);
// Store in S3 when configured; otherwise keep legacy DB storage for compatibility.
let mut audio_url: Option<String> = None;
let mut audio_data_db: Option<Vec<u8>> = None;
if let Some(settings) = get_s3_audio_settings() {
let extension = std::path::Path::new(&filename)
.extension()
.and_then(|v| v.to_str())
.unwrap_or("webm");
let content_type = mime_guess::from_path(&filename)
.first_or_octet_stream()
.to_string();
let key = build_s3_audio_key(
org_ctx.id,
course_id,
lesson_id,
claims.sub,
response_id,
extension,
);
match build_s3_audio_client(&settings).await {
Ok(s3_client) => {
let put_result = s3_client
.put_object()
.bucket(&settings.bucket)
.key(&key)
.content_type(content_type)
.body(audio_data.clone().into())
.send()
.await;
if put_result.is_ok() {
audio_url = Some(build_s3_audio_public_url(&settings, &key));
} else {
// Fallback to DB storage if S3 upload fails.
audio_data_db = Some(
base64::engine::general_purpose::STANDARD
.encode(&audio_data)
.into_bytes(),
);
}
}
Err(_) => {
audio_data_db = Some(
base64::engine::general_purpose::STANDARD
.encode(&audio_data)
.into_bytes(),
);
}
}
} else {
audio_data_db = Some(
base64::engine::general_purpose::STANDARD
.encode(&audio_data)
.into_bytes(),
);
}
let _ = sqlx::query(
r#"INSERT INTO audio_responses
(organization_id, user_id, course_id, lesson_id, block_id, prompt, transcript, audio_data,
(id, organization_id, user_id, course_id, lesson_id, block_id, prompt, transcript, audio_url, audio_data,
ai_score, ai_found_keywords, ai_feedback, ai_evaluated_at,
status, attempt_number, duration_seconds)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, NOW(), $12, $13, $14)"#
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, NOW(), $14, $15, $16)"#
)
.bind(response_id)
.bind(org_ctx.id)
.bind(claims.sub)
.bind(course_id)
@@ -2430,7 +2591,8 @@ pub async fn evaluate_audio_file(
.bind(block_id)
.bind(&prompt)
.bind(&transcript)
.bind(&audio_base64)
.bind(&audio_url)
.bind(&audio_data_db)
.bind(grading.score)
.bind(&grading.found_keywords)
.bind(&grading.feedback)
@@ -2476,6 +2638,37 @@ pub struct AudioResponseFilters {
pub user_id: Option<Uuid>,
}
async fn instructor_has_course_access(
pool: &PgPool,
org_id: Uuid,
instructor_id: Uuid,
course_id: Uuid,
) -> Result<bool, StatusCode> {
let has_access: bool = sqlx::query_scalar(
r#"
SELECT EXISTS(
SELECT 1
FROM course_instructors ci
JOIN courses c ON c.id = ci.course_id
WHERE c.organization_id = $1
AND ci.course_id = $2
AND ci.user_id = $3
)
"#,
)
.bind(org_id)
.bind(course_id)
.bind(instructor_id)
.fetch_one(pool)
.await
.map_err(|e| {
tracing::error!("Error validating instructor course access: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(has_access)
}
/// Get all audio responses for teachers
/// Filters: course_id, lesson_id, status (pending, ai_evaluated, teacher_evaluated, both_evaluated), user_id
pub async fn get_audio_responses(
@@ -2489,7 +2682,9 @@ pub async fn get_audio_responses(
return Err(StatusCode::FORBIDDEN);
}
// Use static query with optional filters
let is_instructor = claims.role == "instructor";
// Use static query with optional filters + instructor scoping
let responses = sqlx::query_as::<_, AudioResponseListItem>(
r#"
SELECT
@@ -2517,14 +2712,30 @@ pub async fn get_audio_responses(
JOIN courses c ON ar.course_id = c.id
JOIN lessons l ON ar.lesson_id = l.id
WHERE ar.organization_id = $1
AND ($2::uuid IS NULL OR ar.course_id = $2)
AND ($3::uuid IS NULL OR ar.lesson_id = $3)
AND ($4::text IS NULL OR ar.status::text = $4)
AND ($5::uuid IS NULL OR ar.user_id = $5)
AND (
$2::boolean = false
OR EXISTS (
SELECT 1
FROM course_instructors ci
WHERE ci.organization_id = ar.organization_id
AND ci.course_id = ar.course_id
AND ci.user_id = $3
)
)
AND ($4::uuid IS NULL OR ar.course_id = $4)
AND ($5::uuid IS NULL OR ar.lesson_id = $5)
AND (
$6::text IS NULL
OR ($6::text = 'pending_instructor' AND ar.status::text IN ('pending', 'ai_evaluated'))
OR ($6::text != 'pending_instructor' AND ar.status::text = $6::text)
)
AND ($7::uuid IS NULL OR ar.user_id = $7)
ORDER BY ar.created_at DESC
"#
)
.bind(org_ctx.id)
.bind(is_instructor)
.bind(claims.sub)
.bind(filters.course_id)
.bind(filters.lesson_id)
.bind(filters.status)
@@ -2590,7 +2801,14 @@ pub async fn get_audio_response_detail(
})?;
match response {
Some(r) => Ok(Json(r)),
Some(r) => {
if claims.role == "instructor"
&& !instructor_has_course_access(&pool, org_ctx.id, claims.sub, r.course_id).await?
{
return Err(StatusCode::FORBIDDEN);
}
Ok(Json(r))
}
None => Err(StatusCode::NOT_FOUND),
}
}
@@ -2598,13 +2816,17 @@ pub async fn get_audio_response_detail(
/// Get audio data as base64 for playback
pub async fn get_audio_response_audio(
Org(org_ctx): Org,
_claims: Claims,
claims: Claims,
State(pool): State<PgPool>,
Path(response_id): Path<Uuid>,
) -> Result<impl IntoResponse, StatusCode> {
if claims.role != "admin" && claims.role != "instructor" && claims.role != "student" {
return Err(StatusCode::FORBIDDEN);
}
// Only instructors, admins, and the owner can access
let audio_data: Option<Vec<u8>> = sqlx::query_scalar(
"SELECT audio_data FROM audio_responses WHERE id = $1 AND organization_id = $2"
let row: Option<(Option<Vec<u8>>, Option<String>, Uuid, Uuid)> = sqlx::query_as(
"SELECT audio_data, audio_url, user_id, course_id FROM audio_responses WHERE id = $1 AND organization_id = $2"
)
.bind(response_id)
.bind(org_ctx.id)
@@ -2615,23 +2837,104 @@ pub async fn get_audio_response_audio(
StatusCode::INTERNAL_SERVER_ERROR
})?;
match audio_data {
Some(data) => {
// Decode from base64
let audio_bytes = base64::engine::general_purpose::STANDARD.decode(&data)
match row {
Some((audio_data, audio_url, owner_user_id, course_id)) => {
// Access rules: admin always, instructor only their courses, student only own response.
if claims.role == "student" && claims.sub != owner_user_id {
return Err(StatusCode::FORBIDDEN);
}
if claims.role == "instructor"
&& !instructor_has_course_access(&pool, org_ctx.id, claims.sub, course_id).await?
{
return Err(StatusCode::FORBIDDEN);
}
if let Some(data) = audio_data {
// Legacy path: DB contains base64 bytes.
let audio_bytes = base64::engine::general_purpose::STANDARD
.decode(&data)
.unwrap_or(data);
Ok(
axum::response::Response::builder()
.header(axum::http::header::CONTENT_TYPE, "audio/webm")
.header(axum::http::header::CONTENT_DISPOSITION, "inline")
.body(axum::body::Body::from(audio_bytes))
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.into_response(),
)
} else if let Some(audio_url) = audio_url {
let (audio_bytes, content_type) = read_audio_response_from_url(&audio_url)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(axum::response::Response::builder()
.header(axum::http::header::CONTENT_TYPE, "audio/webm")
.header(axum::http::header::CONTENT_DISPOSITION, "inline")
.body(axum::body::Body::from(audio_bytes))
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.into_response())
Ok(
axum::response::Response::builder()
.header(axum::http::header::CONTENT_TYPE, content_type)
.header(axum::http::header::CONTENT_DISPOSITION, "inline")
.body(axum::body::Body::from(audio_bytes))
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.into_response(),
)
} else {
Err(StatusCode::NOT_FOUND)
}
}
None => Err(StatusCode::NOT_FOUND),
}
}
async fn read_audio_response_from_url(url: &str) -> Result<(Vec<u8>, String), String> {
if let Some((bucket, key)) = parse_s3_url(url) {
let settings = get_s3_audio_settings()
.ok_or_else(|| "S3 audio settings are missing".to_string())?;
let client = build_s3_audio_client(&settings).await?;
let output = client
.get_object()
.bucket(bucket)
.key(key)
.send()
.await
.map_err(|e| format!("S3 read failed: {}", e))?;
let content_type = output
.content_type()
.map(|s| s.to_string())
.unwrap_or_else(|| "audio/webm".to_string());
let bytes = output
.body
.collect()
.await
.map_err(|e| format!("S3 body read failed: {}", e))?
.into_bytes()
.to_vec();
return Ok((bytes, content_type));
}
let response = reqwest::Client::new()
.get(url)
.send()
.await
.map_err(|e| format!("HTTP audio fetch failed: {}", e))?;
if !response.status().is_success() {
return Err(format!("HTTP audio fetch status: {}", response.status()));
}
let content_type = response
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.unwrap_or("audio/webm")
.to_string();
let bytes = response
.bytes()
.await
.map_err(|e| format!("HTTP audio bytes failed: {}", e))?
.to_vec();
Ok((bytes, content_type))
}
/// Teacher evaluates an audio response
pub async fn teacher_evaluate_audio(
Org(org_ctx): Org,
@@ -2651,8 +2954,8 @@ pub async fn teacher_evaluate_audio(
}
// Get current response to determine new status
let current_status: String = sqlx::query_scalar(
"SELECT status::text FROM audio_responses WHERE id = $1 AND organization_id = $2"
let response_meta: Option<(String, Uuid)> = sqlx::query_as(
"SELECT status::text, course_id FROM audio_responses WHERE id = $1 AND organization_id = $2"
)
.bind(response_id)
.bind(org_ctx.id)
@@ -2662,7 +2965,15 @@ pub async fn teacher_evaluate_audio(
tracing::error!("Error fetching audio response: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?
.unwrap_or_else(|| "pending".to_string());
;
let (current_status, course_id) = response_meta.ok_or(StatusCode::NOT_FOUND)?;
if claims.role == "instructor"
&& !instructor_has_course_access(&pool, org_ctx.id, claims.sub, course_id).await?
{
return Err(StatusCode::FORBIDDEN);
}
// Determine new status
let new_status = if current_status == "ai_evaluated" {
@@ -2720,6 +3031,12 @@ pub async fn get_audio_response_stats(
return Err(StatusCode::FORBIDDEN);
}
if claims.role == "instructor"
&& !instructor_has_course_access(&pool, org_ctx.id, claims.sub, course_id).await?
{
return Err(StatusCode::FORBIDDEN);
}
let stats = sqlx::query_as::<_, common::models::AudioResponseStats>(
r#"
SELECT