feat: Introduce course marketing features with dedicated metadata, image generation, and UI in both studio and experience apps.

This commit is contained in:
2026-03-04 15:41:34 -03:00
parent 4458decd22
commit 01c54429a0
25 changed files with 1453 additions and 401 deletions
@@ -0,0 +1,60 @@
-- Migration: Add Course Marketing Metadata and Image Generation fields
-- 1. Add columns to courses table
ALTER TABLE courses
ADD COLUMN IF NOT EXISTS marketing_metadata JSONB DEFAULT '{}',
ADD COLUMN IF NOT EXISTS course_image_url TEXT,
ADD COLUMN IF NOT EXISTS generation_status VARCHAR(20) DEFAULT 'idle',
ADD COLUMN IF NOT EXISTS generation_progress INTEGER DEFAULT 0,
ADD COLUMN IF NOT EXISTS generation_error TEXT;
-- 2. Update fn_create_course to handle initial marketing_metadata
CREATE OR REPLACE FUNCTION fn_create_course(
p_organization_id UUID,
p_instructor_id UUID,
p_title VARCHAR(255),
p_pacing_mode VARCHAR(50) DEFAULT 'self_paced'
) RETURNS SETOF courses AS $$
BEGIN
RETURN QUERY
INSERT INTO courses (organization_id, instructor_id, title, pacing_mode, marketing_metadata)
VALUES (p_organization_id, p_instructor_id, p_title, p_pacing_mode, '{}')
RETURNING *;
END;
$$ LANGUAGE plpgsql;
-- 3. Update fn_update_course to include marketing fields
CREATE OR REPLACE FUNCTION fn_update_course(
p_id UUID,
p_organization_id UUID,
p_title VARCHAR(255),
p_description TEXT,
p_passing_percentage INTEGER,
p_pacing_mode VARCHAR(50),
p_start_date TIMESTAMPTZ,
p_end_date TIMESTAMPTZ,
p_certificate_template VARCHAR(255) DEFAULT NULL,
p_price DOUBLE PRECISION DEFAULT 0.0,
p_currency VARCHAR(10) DEFAULT 'USD',
p_marketing_metadata JSONB DEFAULT NULL,
p_course_image_url TEXT DEFAULT NULL
) RETURNS SETOF courses AS $$
BEGIN
RETURN QUERY
UPDATE courses
SET title = COALESCE(p_title, title),
description = COALESCE(p_description, description),
passing_percentage = COALESCE(p_passing_percentage, passing_percentage),
pacing_mode = COALESCE(p_pacing_mode, pacing_mode),
start_date = p_start_date,
end_date = p_end_date,
certificate_template = COALESCE(p_certificate_template, certificate_template),
price = COALESCE(p_price, price),
currency = COALESCE(p_currency, currency),
marketing_metadata = COALESCE(p_marketing_metadata, marketing_metadata),
course_image_url = COALESCE(p_course_image_url, course_image_url),
updated_at = NOW()
WHERE id = p_id AND organization_id = p_organization_id
RETURNING *;
END;
$$ LANGUAGE plpgsql;
Binary file not shown.

After

Width:  |  Height:  |  Size: 476 KiB

+68 -13
View File
@@ -3,7 +3,7 @@ import time
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
from diffusers import StableDiffusionPipeline
from diffusers import StableDiffusionPipeline, DPMSolverMultistepScheduler
from PIL import Image
import uuid
@@ -20,14 +20,20 @@ pipe = None
def load_model():
global pipe
print("Loading Stable Diffusion model on CPU...")
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Loading Stable Diffusion model on {device}...")
pipe = StableDiffusionPipeline.from_pretrained(
MODEL_ID,
torch_dtype=torch.float32,
torch_dtype=torch.float16 if device == "cuda" else torch.float32,
)
pipe.to("cpu")
# pipe.enable_model_cpu_offload()
pipe.enable_attention_slicing()
# Use a high-quality scheduler for better detail
pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config)
pipe.to(device)
if device == "cpu":
pipe.enable_attention_slicing()
print("Model loaded successfully.")
from contextlib import asynccontextmanager
@@ -43,9 +49,16 @@ app = FastAPI(lifespan=lifespan)
from fastapi.staticfiles import StaticFiles
app.mount("/outputs", StaticFiles(directory=OUTPUT_DIR), name="outputs")
import psycopg2
class ImageRequest(BaseModel):
prompt: str
lesson_id: str
database_url: str = None
table_name: str = "lessons"
progress_column: str = "generation_progress"
width: int = 512
height: int = 512
@app.post("/generate")
async def generate_image(request: ImageRequest):
@@ -53,24 +66,66 @@ async def generate_image(request: ImageRequest):
if pipe is None:
load_model()
num_steps = 150
def progress_callback(step: int, timestep: int, latents: torch.FloatTensor):
if request.database_url and request.lesson_id:
try:
progress = int((step / num_steps) * 100)
conn = psycopg2.connect(request.database_url)
cur = conn.cursor()
# Use psycopg2.sql for safe table/column names if possible,
# but here we'll just format since we control the backend values
query = f"UPDATE {request.table_name} SET {request.progress_column} = %s WHERE id = %s"
cur.execute(query, (progress, request.lesson_id))
conn.commit()
cur.close()
conn.close()
except Exception as db_e:
print(f"Database update error: {db_e}")
def callback_dynamic_cfg(pipe, step_index, timestep, callback_kwargs):
if progress_callback:
progress_callback(step_index, timestep, None)
return callback_kwargs
try:
print(f"Generating image for prompt: {request.prompt}")
quality_prompt = f"{request.prompt}, highly detailed, high quality, masterpiece, 8k, realistic, photographic, sharp focus, perfect anatomy"
negative_prompt = "deformed, distorted, disfigured, poorly drawn, bad anatomy, wrong anatomy, extra limb, missing limb, floating limbs, disconnected limbs, mutation, mutated, ugly, disgusting, blurry, low quality, low resolution, bad hands, extra fingers, cartoon, anime, illustration, draft, grainy"
print(f"Generating image ({request.width}x{request.height}) for prompt: {quality_prompt}")
generator = torch.manual_seed(42)
# Using a small number of steps for speed since it's on CPU
# Generation with custom resolution
image = pipe(
request.prompt,
num_inference_steps=20,
generator=generator
quality_prompt,
negative_prompt=negative_prompt,
num_inference_steps=num_steps,
guidance_scale=8.5,
width=request.width,
height=request.height,
callback_on_step_end=callback_dynamic_cfg
).images[0]
# Ensure progress is 100% at the end
if request.database_url and request.lesson_id:
try:
conn = psycopg2.connect(request.database_url)
cur = conn.cursor()
query = f"UPDATE {request.table_name} SET {request.progress_column} = 100 WHERE id = %s"
cur.execute(query, (request.lesson_id,))
conn.commit()
cur.close()
conn.close()
except:
pass
image_filename = f"image_{request.lesson_id}_{uuid.uuid4().hex[:8]}.png"
image_path = os.path.join(OUTPUT_DIR, image_filename)
image.save(image_path)
# Return the absolute URL pointing to t-800 so the frontend can find it
hostname = os.getenv("BRIDGE_HOSTNAME", "localhost")
hostname = os.getenv("BRIDGE_HOSTNAME", "t-800")
full_url = f"http://{hostname}:8080/outputs/{image_filename}"
return {"status": "completed", "url": full_url}
+218 -23
View File
@@ -408,6 +408,17 @@ pub async fn update_course(
.map(|s| s.to_string())
.unwrap_or(existing.currency);
let marketing_metadata = payload
.get("marketing_metadata")
.cloned()
.or(existing.marketing_metadata);
let course_image_url = payload
.get("course_image_url")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.or(existing.course_image_url);
// BEGIN TRANSACTION
let mut tx = pool
.begin()
@@ -425,7 +436,7 @@ pub async fn update_course(
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let course = sqlx::query_as::<_, Course>(
"SELECT * FROM fn_update_course($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
"SELECT * FROM fn_update_course($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
)
.bind(id)
.bind(org_ctx.id)
@@ -438,6 +449,8 @@ pub async fn update_course(
.bind(certificate_template)
.bind(price)
.bind(currency)
.bind(marketing_metadata)
.bind(course_image_url)
.fetch_one(&mut *tx)
.await
.map_err(|e| {
@@ -806,13 +819,19 @@ pub async fn run_transcription_task(pool: PgPool, lesson_id: Uuid) -> Result<(),
let filename = url.trim_start_matches("/assets/");
let file_path = format!("uploads/{}", filename);
// 2. Set status to processing
// 2. Set status to processing ONLY if it's still queued (not cancelled/idle)
tracing::info!("Starting transcription for lesson {} (file: {})", lesson_id, file_path);
sqlx::query("UPDATE lessons SET transcription_status = 'processing' WHERE id = $1")
let rows_affected = sqlx::query("UPDATE lessons SET transcription_status = 'processing' WHERE id = $1 AND transcription_status = 'queued'")
.bind(lesson_id)
.execute(&pool)
.await
.map_err(|e| format!("Update to processing failed: {}", e))?;
.map_err(|e| format!("Update to processing failed: {}", e))?
.rows_affected();
if rows_affected == 0 {
tracing::info!("Transcription task {} was cancelled or is already processing. Aborting.", lesson_id);
return Ok(());
}
// 3. Read file
let file_data = tokio::fs::read(&file_path)
@@ -878,8 +897,8 @@ pub async fn run_transcription_task(pool: PgPool, lesson_id: Uuid) -> Result<(),
}
}
// 6. Update lesson with bilinguial transcription
sqlx::query("UPDATE lessons SET transcription = $1, transcription_status = 'completed' WHERE id = $2")
// 6. Update lesson with bilinguial transcription - ONLY if not cancelled (idle)
sqlx::query("UPDATE lessons SET transcription = $1, transcription_status = 'completed' WHERE id = $2 AND transcription_status = 'processing'")
.bind(&transcription_result)
.bind(lesson_id)
.execute(&pool)
@@ -1286,6 +1305,8 @@ pub async fn generate_quiz(
#[derive(Deserialize)]
pub struct VideoAIRequest {
pub prompt: Option<String>,
pub width: Option<u32>,
pub height: Option<u32>,
}
pub async fn generate_image(
@@ -1334,8 +1355,11 @@ pub async fn generate_image(
// 3. Spawn background task
let pool_clone = pool.clone();
let prompt_to_task = payload.prompt.clone();
let user_id = claims.sub;
let width = payload.width;
let height = payload.height;
tokio::spawn(async move {
if let Err(e) = run_image_generation_task(pool_clone, id, prompt_to_task).await {
if let Err(e) = run_image_generation_task(pool_clone, id, prompt_to_task, Some(user_id), false, width, height).await {
tracing::error!("Image generation task failed for lesson {}: {}", id, e);
}
});
@@ -1343,36 +1367,123 @@ pub async fn generate_image(
Ok(Json(updated_lesson))
}
pub async fn run_image_generation_task(pool: PgPool, lesson_id: Uuid, custom_prompt: Option<String>) -> Result<(), String> {
// 1. Set status to processing
sqlx::query("UPDATE lessons SET video_generation_status = 'processing' WHERE id = $1")
.bind(lesson_id)
pub async fn generate_course_image(
Org(org_ctx): Org,
claims: common::auth::Claims,
State(pool): State<PgPool>,
Path(id): Path<Uuid>,
Json(payload): Json<VideoAIRequest>,
) -> Result<Json<Course>, StatusCode> {
// 1. Fetch course
let _course =
sqlx::query_as::<_, Course>("SELECT * FROM courses WHERE id = $1 AND organization_id = $2")
.bind(id)
.bind(org_ctx.id)
.fetch_one(&pool)
.await
.map_err(|_| StatusCode::NOT_FOUND)?;
// 2. Set status to queued
let updated_course = sqlx::query_as::<_, Course>(
"UPDATE courses SET generation_status = 'queued', generation_error = NULL WHERE id = $1 RETURNING *",
)
.bind(id)
.fetch_one(&pool)
.await
.map_err(|e| {
tracing::error!("Database update failed (course image queued): {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
log_action(
&pool,
org_ctx.id,
claims.sub,
"COURSE_IMAGE_GENERATION_QUEUED",
"Course",
id,
json!({ "status": "queued" }),
)
.await;
// 3. Spawn background task
let pool_clone = pool.clone();
let prompt_to_task = payload.prompt.clone();
let user_id = claims.sub;
let width = payload.width;
let height = payload.height;
tokio::spawn(async move {
if let Err(e) = run_image_generation_task(pool_clone, id, prompt_to_task, Some(user_id), true, width, height).await {
tracing::error!("Image generation task failed for course {}: {}", id, e);
}
});
Ok(Json(updated_course))
}
pub async fn run_image_generation_task(
pool: PgPool,
id: Uuid,
custom_prompt: Option<String>,
user_id: Option<Uuid>,
is_course: bool,
width: Option<u32>,
height: Option<u32>
) -> Result<(), String> {
let (status_col, progress_col, error_col, table_name) = if is_course {
("generation_status", "generation_progress", "generation_error", "courses")
} else {
("video_generation_status", "generation_progress", "video_generation_error", "lessons")
};
// 1. Set status to processing ONLY if it's still queued (not cancelled/idle)
let query = format!(
"UPDATE {} SET {} = 'processing' WHERE id = $1 AND {} = 'queued'",
table_name, status_col, status_col
);
let rows_affected = sqlx::query(&query)
.bind(id)
.execute(&pool)
.await
.map_err(|e| format!("Update to processing failed: {}", e))?;
.map_err(|e| format!("Update to processing failed: {}", e))?
.rows_affected();
if rows_affected == 0 {
tracing::info!("Task {} was cancelled or is already processing. Aborting background thread.", id);
return Ok(());
}
// 2. Call Local Video Bridge (Python)
let client = reqwest::Client::new();
let bridge_base_url = std::env::var("LOCAL_VIDEO_BRIDGE_URL")
.unwrap_or_else(|_| "http://localhost:8080".to_string());
.unwrap_or_else(|_| "http://t-800:8080".to_string());
let bridge_url = format!("{}/generate", bridge_base_url);
// Fallback logic for prompt: Custom Prompt > Title
let final_prompt = match custom_prompt {
Some(p) if !p.is_empty() => p,
_ => {
sqlx::query_scalar("SELECT title FROM lessons WHERE id = $1")
.bind(lesson_id)
let title: String = sqlx::query_scalar(&format!("SELECT title FROM {} WHERE id = $1", table_name))
.bind(id)
.fetch_one(&pool)
.await
.map_err(|e| format!("Failed to fetch fallback prompt: {}", e))?
.map_err(|e| format!("Failed to fetch fallback prompt: {}", e))?;
title
}
};
let database_url = std::env::var("BRIDGE_DATABASE_URL")
.unwrap_or_else(|_| std::env::var("DATABASE_URL").unwrap_or_default());
let response = client.post(bridge_url)
.json(&serde_json::json!({
"prompt": final_prompt,
"lesson_id": lesson_id.to_string()
"lesson_id": id.to_string(), // The bridge uses lesson_id as a generic id for progress reporting
"database_url": database_url,
"table_name": table_name, // Pass table name so bridge knows where to update progress
"progress_column": progress_col,
"width": width,
"height": height
}))
.send()
.await
@@ -1380,24 +1491,108 @@ pub async fn run_image_generation_task(pool: PgPool, lesson_id: Uuid, custom_pro
if !response.status().is_success() {
let err_text = response.text().await.unwrap_or_default();
// Update error in DB
let _ = sqlx::query(&format!("UPDATE {} SET {} = $1, {} = 'error' WHERE id = $2", table_name, error_col, status_col))
.bind(&err_text)
.bind(id)
.execute(&pool)
.await;
return Err(format!("Video bridge error: {}", err_text));
}
let result: serde_json::Value = response.json().await
.map_err(|e| format!("Failed to parse video bridge response: {}", e))?;
let content_url = result["url"].as_str()
let bridge_content_url = result["url"].as_str()
.ok_or_else(|| "Video bridge response missing URL".to_string())?;
// 3. Complete task
// --- Download image and store as Asset ---
// 1. Download from bridge
let image_bytes = client.get(bridge_content_url)
.send()
.await
.map_err(|e| format!("Failed to download generated image: {}", e))?
.bytes()
.await
.map_err(|e| format!("Failed to read image bytes: {}", e))?;
// 2. Fetch context
let (org_id, course_id, instructor_id): (Uuid, Uuid, Uuid) = if is_course {
let c = sqlx::query_as::<sqlx::Postgres, (Uuid, Uuid, Uuid)>(
"SELECT organization_id, id as course_id, instructor_id FROM courses WHERE id = $1"
)
.bind(id)
.fetch_one(&pool)
.await
.map_err(|e| format!("Failed to fetch course context: {}", e))?;
c
} else {
sqlx::query_as::<sqlx::Postgres, (Uuid, Uuid, Uuid)>(
"SELECT l.organization_id, m.course_id, c.instructor_id
FROM lessons l
JOIN modules m ON l.module_id = m.id
JOIN courses c ON m.course_id = c.id
WHERE l.id = $1"
)
.bind(id)
.fetch_one(&pool)
.await
.map_err(|e| format!("Failed to fetch lesson context: {}", e))?
};
let final_user_id = user_id.unwrap_or(instructor_id);
// 3. Save file locally
let asset_id = Uuid::new_v4();
let storage_filename = format!("{}.png", asset_id);
let storage_path = format!("uploads/{}", storage_filename);
let _ = tokio::fs::create_dir_all("uploads").await;
tokio::fs::write(&storage_path, &image_bytes).await.map_err(|e| e.to_string())?;
let size_bytes = image_bytes.len() as i64;
let local_url = format!("/assets/{}", storage_filename);
// 4. Register in assets table
sqlx::query(
"UPDATE lessons SET video_generation_status = 'completed', content_url = $1, content_type = 'image' WHERE id = $2"
r#"
INSERT INTO assets (id, organization_id, uploaded_by, course_id, filename, storage_path, mimetype, size_bytes)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
"#,
)
.bind(content_url)
.bind(lesson_id)
.bind(asset_id)
.bind(org_id)
.bind(final_user_id)
.bind(course_id)
.bind(format!("AI: {}", final_prompt))
.bind(&storage_path)
.bind("image/png")
.bind(size_bytes)
.execute(&pool)
.await
.map_err(|e| format!("Update to completed failed: {}", e))?;
.map_err(|e| format!("Failed to register asset: {}", e))?;
// 3. Complete task updating entity with local URL - ONLY if not cancelled (idle)
if is_course {
sqlx::query(
"UPDATE courses SET generation_status = 'completed', course_image_url = $1 WHERE id = $2 AND generation_status = 'processing'"
)
.bind(local_url)
.bind(id)
.execute(&pool)
.await
.map_err(|e| format!("Failed to complete course task: {}", e))?;
} else {
sqlx::query(
"UPDATE lessons SET video_generation_status = 'completed', content_url = $1, content_type = 'image' WHERE id = $2 AND video_generation_status = 'processing'"
)
.bind(local_url)
.bind(id)
.execute(&pool)
.await
.map_err(|e| format!("Failed to complete lesson task: {}", e))?;
}
Ok(())
}
+14 -10
View File
@@ -15,6 +15,7 @@ pub struct BackgroundTask {
pub course_title: Option<String>,
pub transcription_status: Option<String>,
pub video_generation_status: Option<String>,
pub generation_progress: Option<i32>,
pub updated_at: chrono::DateTime<chrono::Utc>,
}
@@ -36,6 +37,7 @@ pub async fn get_background_tasks(
c.title as course_title,
l.transcription_status,
l.video_generation_status,
l.generation_progress,
l.updated_at
FROM lessons l
JOIN modules m ON l.module_id = m.id
@@ -105,16 +107,18 @@ pub async fn cancel_task(
// We can't easily kill a running tokio task unless we had a handle map, which we don't.
// So this is effectively "Dismiss".
sqlx::query("UPDATE lessons SET transcription_status = 'idle' WHERE id = $1")
.bind(id)
.execute(&pool)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to cancel task: {}", e),
)
})?;
sqlx::query(
"UPDATE lessons SET transcription_status = 'idle', video_generation_status = 'idle' WHERE id = $1"
)
.bind(id)
.execute(&pool)
.await
.map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to cancel task: {}", e),
)
})?;
Ok(StatusCode::NO_CONTENT)
}
+2 -1
View File
@@ -92,7 +92,7 @@ async fn main() {
for lesson_id in queued_video_lessons {
tracing::info!("Processing video generation for lesson: {}", lesson_id);
if let Err(e) =
handlers::run_image_generation_task(worker_pool.clone(), lesson_id, None).await
handlers::run_image_generation_task(worker_pool.clone(), lesson_id, None, None, false, None, None).await
{
tracing::error!("Image generation task failed for lesson {}: {}", lesson_id, e);
let _ = sqlx::query(
@@ -176,6 +176,7 @@ async fn main() {
.route("/lessons/{id}/summarize", post(handlers::summarize_lesson))
.route("/lessons/{id}/generate-quiz", post(handlers::generate_quiz))
.route("/lessons/{id}/generate-image", post(handlers::generate_image))
.route("/courses/{id}/generate-image", post(handlers::generate_course_image))
.route("/courses/generate", post(handlers::generate_course))
.route("/courses/{id}/export", get(handlers::export_course))
.route("/courses/import", post(handlers::import_course))