From 935e6b967550d2cd82266b1cb950268924274ea6 Mon Sep 17 00:00:00 2001 From: Nurfog Date: Wed, 4 Mar 2026 16:55:31 -0300 Subject: [PATCH] feat: Add generation status to courses, implement cancellation, and enhance AI bridge call with retry logic. --- ...304000001_update_course_status_support.sql | 37 +++++ services/cms-service/scripts/video_bridge.py | 21 ++- services/cms-service/src/handlers.rs | 156 +++++++++++++----- .../courses/[id]/marketing/MarketingTab.tsx | 30 +++- 4 files changed, 200 insertions(+), 44 deletions(-) create mode 100644 services/cms-service/migrations/20260304000001_update_course_status_support.sql diff --git a/services/cms-service/migrations/20260304000001_update_course_status_support.sql b/services/cms-service/migrations/20260304000001_update_course_status_support.sql new file mode 100644 index 0000000..c113d54 --- /dev/null +++ b/services/cms-service/migrations/20260304000001_update_course_status_support.sql @@ -0,0 +1,37 @@ +-- Migration: Update fn_update_course to include generation_status +CREATE OR REPLACE FUNCTION fn_update_course( + p_id UUID, + p_organization_id UUID, + p_title VARCHAR(255), + p_description TEXT, + p_passing_percentage INTEGER, + p_pacing_mode VARCHAR(50), + p_start_date TIMESTAMPTZ, + p_end_date TIMESTAMPTZ, + p_certificate_template VARCHAR(255) DEFAULT NULL, + p_price DOUBLE PRECISION DEFAULT 0.0, + p_currency VARCHAR(10) DEFAULT 'USD', + p_marketing_metadata JSONB DEFAULT NULL, + p_course_image_url TEXT DEFAULT NULL, + p_generation_status VARCHAR(20) DEFAULT NULL +) RETURNS SETOF courses AS $$ +BEGIN + RETURN QUERY + UPDATE courses + SET title = COALESCE(p_title, title), + description = COALESCE(p_description, description), + passing_percentage = COALESCE(p_passing_percentage, passing_percentage), + pacing_mode = COALESCE(p_pacing_mode, pacing_mode), + start_date = p_start_date, + end_date = p_end_date, + certificate_template = COALESCE(p_certificate_template, certificate_template), + price = COALESCE(p_price, price), + currency = COALESCE(p_currency, currency), + marketing_metadata = COALESCE(p_marketing_metadata, marketing_metadata), + course_image_url = COALESCE(p_course_image_url, course_image_url), + generation_status = COALESCE(p_generation_status, generation_status), + updated_at = NOW() + WHERE id = p_id AND organization_id = p_organization_id + RETURNING *; +END; +$$ LANGUAGE plpgsql; diff --git a/services/cms-service/scripts/video_bridge.py b/services/cms-service/scripts/video_bridge.py index 56fca70..2d32c1d 100644 --- a/services/cms-service/scripts/video_bridge.py +++ b/services/cms-service/scripts/video_bridge.py @@ -74,14 +74,31 @@ async def generate_image(request: ImageRequest): progress = int((step / num_steps) * 100) conn = psycopg2.connect(request.database_url) cur = conn.cursor() - # Use psycopg2.sql for safe table/column names if possible, - # but here we'll just format since we control the backend values + + # Check for cancellation + status_query = f"SELECT {request.table_name.replace('generation_progress', 'generation_status')} FROM {request.table_name} WHERE id = %s" + # Wait, the column name is fixed based on table. + # courses -> generation_status + # lessons -> video_generation_status + status_col = "generation_status" if request.table_name == "courses" else "video_generation_status" + cur.execute(f"SELECT {status_col} FROM {request.table_name} WHERE id = %s", (request.lesson_id,)) + status = cur.fetchone()[0] + + if status == 'idle': + print(f"Generation for {request.lesson_id} was cancelled. Aborting.") + cur.close() + conn.close() + raise Exception("Generation cancelled by user") + + # Update progress query = f"UPDATE {request.table_name} SET {request.progress_column} = %s WHERE id = %s" cur.execute(query, (progress, request.lesson_id)) conn.commit() cur.close() conn.close() except Exception as db_e: + if "cancelled" in str(db_e).lower(): + raise db_e print(f"Database update error: {db_e}") def callback_dynamic_cfg(pipe, step_index, timestep, callback_kwargs): diff --git a/services/cms-service/src/handlers.rs b/services/cms-service/src/handlers.rs index 5a085cb..675bdaf 100644 --- a/services/cms-service/src/handlers.rs +++ b/services/cms-service/src/handlers.rs @@ -419,6 +419,12 @@ pub async fn update_course( .map(|s| s.to_string()) .or(existing.course_image_url); + let generation_status = payload + .get("generation_status") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .or(existing.generation_status); + // BEGIN TRANSACTION let mut tx = pool .begin() @@ -436,7 +442,7 @@ pub async fn update_course( .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let course = sqlx::query_as::<_, Course>( - "SELECT * FROM fn_update_course($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)", + "SELECT * FROM fn_update_course($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)", ) .bind(id) .bind(org_ctx.id) @@ -451,6 +457,7 @@ pub async fn update_course( .bind(currency) .bind(marketing_metadata) .bind(course_image_url) + .bind(generation_status) .fetch_one(&mut *tx) .await .map_err(|e| { @@ -1453,7 +1460,7 @@ pub async fn run_image_generation_task( return Ok(()); } - // 2. Call Local Video Bridge (Python) + // 2. Call Local Video Bridge (Python) with Retry Logic let client = reqwest::Client::new(); let bridge_base_url = std::env::var("LOCAL_VIDEO_BRIDGE_URL") .unwrap_or_else(|_| "http://t-800:8080".to_string()); @@ -1475,36 +1482,109 @@ pub async fn run_image_generation_task( let database_url = std::env::var("BRIDGE_DATABASE_URL") .unwrap_or_else(|_| std::env::var("DATABASE_URL").unwrap_or_default()); - let response = client.post(bridge_url) - .json(&serde_json::json!({ - "prompt": final_prompt, - "lesson_id": id.to_string(), // The bridge uses lesson_id as a generic id for progress reporting - "database_url": database_url, - "table_name": table_name, // Pass table name so bridge knows where to update progress - "progress_column": progress_col, - "width": width, - "height": height - })) - .send() - .await - .map_err(|e| format!("Failed to call video bridge: {}", e))?; + let mut retry_count = 0; + let max_retries = 3; // Initial quick retries + let long_retry_delay = tokio::time::Duration::from_secs(600); // 10 minutes - if !response.status().is_success() { - let err_text = response.text().await.unwrap_or_default(); - // Update error in DB - let _ = sqlx::query(&format!("UPDATE {} SET {} = $1, {} = 'error' WHERE id = $2", table_name, error_col, status_col)) - .bind(&err_text) - .bind(id) - .execute(&pool) + loop { + let response_result = client.post(&bridge_url) + .json(&serde_json::json!({ + "prompt": final_prompt, + "lesson_id": id.to_string(), + "database_url": database_url, + "table_name": table_name, + "progress_column": progress_col, + "width": width, + "height": height + })) + .send() .await; - return Err(format!("Video bridge error: {}", err_text)); + + match response_result { + Ok(response) => { + if response.status().is_success() { + let result: serde_json::Value = response.json().await + .map_err(|e| format!("Failed to parse video bridge response: {}", e))?; + + let bridge_content_url = result["url"].as_str() + .ok_or_else(|| "Video bridge response missing URL".to_string())?; + + // Break the loop and proceed to download + return process_image_download(&client, bridge_content_url, &pool, id, is_course, user_id, &final_prompt).await; + } else { + let err_text = response.text().await.unwrap_or_default(); + let _ = sqlx::query(&format!("UPDATE {} SET {} = $1, {} = 'error' WHERE id = $2", table_name, error_col, status_col)) + .bind(&err_text) + .bind(id) + .execute(&pool) + .await; + return Err(format!("Video bridge error: {}", err_text)); + } + } + Err(e) => { + tracing::warn!("Failed to reach AI bridge at {}: {}. Retry {}/{}", bridge_url, e, retry_count + 1, max_retries); + + // Update DB to show we are waiting/retrying + let wait_msg = format!("El servidor de IA (t-800) no responde. Reintentando automáticamente en 10 min... (Error: {})", e); + let _ = sqlx::query(&format!("UPDATE {} SET {} = $1, {} = 'queued' WHERE id = $2", table_name, error_col, status_col)) + .bind(&wait_msg) + .bind(id) + .execute(&pool) + .await; + + // Check if task was cancelled while we were about to sleep + let current_status: String = sqlx::query_scalar(&format!("SELECT {} FROM {} WHERE id = $1", status_col, table_name)) + .bind(id) + .fetch_one(&pool) + .await + .unwrap_or_else(|_| "error".to_string()); + + if current_status == "idle" || current_status == "error" { + tracing::info!("Task {} was cancelled or errored during retry. Aborting.", id); + return Ok(()); + } + + retry_count += 1; + // Wait 10 minutes before next attempt + tokio::time::sleep(long_retry_delay).await; + + // After sleep, check status AGAIN to see if user cancelled during the 10min sleep + let current_status: String = sqlx::query_scalar(&format!("SELECT {} FROM {} WHERE id = $1", status_col, table_name)) + .bind(id) + .fetch_one(&pool) + .await + .unwrap_or_else(|_| "error".to_string()); + + if current_status != "queued" { + tracing::info!("Task {} status changed to {} during sleep. Aborting retry.", id, current_status); + return Ok(()); + } + + // Set back to processing to "lock" it again for this attempt + let rows_affected = sqlx::query(&format!("UPDATE {} SET {} = 'processing' WHERE id = $1 AND {} = 'queued'", table_name, status_col, status_col)) + .bind(id) + .execute(&pool) + .await + .map_err(|e| e.to_string())? + .rows_affected(); + + if rows_affected == 0 { + return Ok(()); + } + } + } } +} - let result: serde_json::Value = response.json().await - .map_err(|e| format!("Failed to parse video bridge response: {}", e))?; - - let bridge_content_url = result["url"].as_str() - .ok_or_else(|| "Video bridge response missing URL".to_string())?; +async fn process_image_download( + client: &reqwest::Client, + bridge_content_url: &str, + pool: &PgPool, + id: Uuid, + is_course: bool, + user_id: Option, + final_prompt: &str +) -> Result<(), String> { // --- Download image and store as Asset --- @@ -1523,7 +1603,7 @@ pub async fn run_image_generation_task( "SELECT organization_id, id as course_id, instructor_id FROM courses WHERE id = $1" ) .bind(id) - .fetch_one(&pool) + .fetch_one(pool) .await .map_err(|e| format!("Failed to fetch course context: {}", e))?; c @@ -1536,7 +1616,7 @@ pub async fn run_image_generation_task( WHERE l.id = $1" ) .bind(id) - .fetch_one(&pool) + .fetch_one(pool) .await .map_err(|e| format!("Failed to fetch lesson context: {}", e))? }; @@ -1569,27 +1649,27 @@ pub async fn run_image_generation_task( .bind(&storage_path) .bind("image/png") .bind(size_bytes) - .execute(&pool) + .execute(pool) .await .map_err(|e| format!("Failed to register asset: {}", e))?; - // 3. Complete task updating entity with local URL - ONLY if not cancelled (idle) + // 5. Complete task updating entity with local URL - ONLY if not cancelled (idle) if is_course { - sqlx::query( + sqlx::query(&format!( "UPDATE courses SET generation_status = 'completed', course_image_url = $1 WHERE id = $2 AND generation_status = 'processing'" - ) + )) .bind(local_url) .bind(id) - .execute(&pool) + .execute(pool) .await .map_err(|e| format!("Failed to complete course task: {}", e))?; } else { - sqlx::query( + sqlx::query(&format!( "UPDATE lessons SET video_generation_status = 'completed', content_url = $1, content_type = 'image' WHERE id = $2 AND video_generation_status = 'processing'" - ) + )) .bind(local_url) .bind(id) - .execute(&pool) + .execute(pool) .await .map_err(|e| format!("Failed to complete lesson task: {}", e))?; } diff --git a/web/studio/src/app/courses/[id]/marketing/MarketingTab.tsx b/web/studio/src/app/courses/[id]/marketing/MarketingTab.tsx index 861cffc..f174981 100644 --- a/web/studio/src/app/courses/[id]/marketing/MarketingTab.tsx +++ b/web/studio/src/app/courses/[id]/marketing/MarketingTab.tsx @@ -20,7 +20,8 @@ import { Maximize, Monitor, Square, - Smartphone + Smartphone, + X } from "lucide-react"; interface MarketingTabProps { @@ -131,6 +132,19 @@ export default function MarketingTab({ courseId }: MarketingTabProps) { } }; + const handleCancelGeneration = async () => { + try { + await cmsApi.updateCourse(courseId, { + generation_status: 'idle' + } as any); + setIsGenerating(false); + setCourse(prev => prev ? { ...prev, generation_status: 'idle' } : null); + } catch (err) { + console.error("Cancel failed", err); + alert("Failed to cancel generation."); + } + }; + if (loading) return (
@@ -171,9 +185,17 @@ export default function MarketingTab({ courseId }: MarketingTabProps) { style={{ width: `${course?.generation_progress || 0}%` }} />
-

+

Analysis Phase: {course?.generation_progress || 0}% Complete

+ +
)} @@ -220,8 +242,8 @@ export default function MarketingTab({ courseId }: MarketingTabProps) { key={res.label} onClick={() => setSelectedRes(res)} className={`p-5 rounded-3xl border transition-all flex items-center gap-4 group ${isSelected - ? "bg-blue-600 border-blue-500 text-white shadow-xl shadow-blue-500/20 active:scale-95" - : "bg-white dark:bg-white/5 border-slate-200 dark:border-white/10 text-slate-600 dark:text-gray-400 hover:bg-slate-50 dark:hover:bg-white/10 active:scale-95 shadow-sm" + ? "bg-blue-600 border-blue-500 text-white shadow-xl shadow-blue-500/20 active:scale-95" + : "bg-white dark:bg-white/5 border-slate-200 dark:border-white/10 text-slate-600 dark:text-gray-400 hover:bg-slate-50 dark:hover:bg-white/10 active:scale-95 shadow-sm" }`} >