feat: Add generation status to courses, implement cancellation, and enhance AI bridge call with retry logic.

This commit is contained in:
2026-03-04 16:55:31 -03:00
parent 01c54429a0
commit 935e6b9675
4 changed files with 200 additions and 44 deletions
+118 -38
View File
@@ -419,6 +419,12 @@ pub async fn update_course(
.map(|s| s.to_string())
.or(existing.course_image_url);
let generation_status = payload
.get("generation_status")
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.or(existing.generation_status);
// BEGIN TRANSACTION
let mut tx = pool
.begin()
@@ -436,7 +442,7 @@ pub async fn update_course(
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
let course = sqlx::query_as::<_, Course>(
"SELECT * FROM fn_update_course($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
"SELECT * FROM fn_update_course($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
)
.bind(id)
.bind(org_ctx.id)
@@ -451,6 +457,7 @@ pub async fn update_course(
.bind(currency)
.bind(marketing_metadata)
.bind(course_image_url)
.bind(generation_status)
.fetch_one(&mut *tx)
.await
.map_err(|e| {
@@ -1453,7 +1460,7 @@ pub async fn run_image_generation_task(
return Ok(());
}
// 2. Call Local Video Bridge (Python)
// 2. Call Local Video Bridge (Python) with Retry Logic
let client = reqwest::Client::new();
let bridge_base_url = std::env::var("LOCAL_VIDEO_BRIDGE_URL")
.unwrap_or_else(|_| "http://t-800:8080".to_string());
@@ -1475,36 +1482,109 @@ pub async fn run_image_generation_task(
let database_url = std::env::var("BRIDGE_DATABASE_URL")
.unwrap_or_else(|_| std::env::var("DATABASE_URL").unwrap_or_default());
let response = client.post(bridge_url)
.json(&serde_json::json!({
"prompt": final_prompt,
"lesson_id": id.to_string(), // The bridge uses lesson_id as a generic id for progress reporting
"database_url": database_url,
"table_name": table_name, // Pass table name so bridge knows where to update progress
"progress_column": progress_col,
"width": width,
"height": height
}))
.send()
.await
.map_err(|e| format!("Failed to call video bridge: {}", e))?;
let mut retry_count = 0;
let max_retries = 3; // Initial quick retries
let long_retry_delay = tokio::time::Duration::from_secs(600); // 10 minutes
if !response.status().is_success() {
let err_text = response.text().await.unwrap_or_default();
// Update error in DB
let _ = sqlx::query(&format!("UPDATE {} SET {} = $1, {} = 'error' WHERE id = $2", table_name, error_col, status_col))
.bind(&err_text)
.bind(id)
.execute(&pool)
loop {
let response_result = client.post(&bridge_url)
.json(&serde_json::json!({
"prompt": final_prompt,
"lesson_id": id.to_string(),
"database_url": database_url,
"table_name": table_name,
"progress_column": progress_col,
"width": width,
"height": height
}))
.send()
.await;
return Err(format!("Video bridge error: {}", err_text));
match response_result {
Ok(response) => {
if response.status().is_success() {
let result: serde_json::Value = response.json().await
.map_err(|e| format!("Failed to parse video bridge response: {}", e))?;
let bridge_content_url = result["url"].as_str()
.ok_or_else(|| "Video bridge response missing URL".to_string())?;
// Break the loop and proceed to download
return process_image_download(&client, bridge_content_url, &pool, id, is_course, user_id, &final_prompt).await;
} else {
let err_text = response.text().await.unwrap_or_default();
let _ = sqlx::query(&format!("UPDATE {} SET {} = $1, {} = 'error' WHERE id = $2", table_name, error_col, status_col))
.bind(&err_text)
.bind(id)
.execute(&pool)
.await;
return Err(format!("Video bridge error: {}", err_text));
}
}
Err(e) => {
tracing::warn!("Failed to reach AI bridge at {}: {}. Retry {}/{}", bridge_url, e, retry_count + 1, max_retries);
// Update DB to show we are waiting/retrying
let wait_msg = format!("El servidor de IA (t-800) no responde. Reintentando automáticamente en 10 min... (Error: {})", e);
let _ = sqlx::query(&format!("UPDATE {} SET {} = $1, {} = 'queued' WHERE id = $2", table_name, error_col, status_col))
.bind(&wait_msg)
.bind(id)
.execute(&pool)
.await;
// Check if task was cancelled while we were about to sleep
let current_status: String = sqlx::query_scalar(&format!("SELECT {} FROM {} WHERE id = $1", status_col, table_name))
.bind(id)
.fetch_one(&pool)
.await
.unwrap_or_else(|_| "error".to_string());
if current_status == "idle" || current_status == "error" {
tracing::info!("Task {} was cancelled or errored during retry. Aborting.", id);
return Ok(());
}
retry_count += 1;
// Wait 10 minutes before next attempt
tokio::time::sleep(long_retry_delay).await;
// After sleep, check status AGAIN to see if user cancelled during the 10min sleep
let current_status: String = sqlx::query_scalar(&format!("SELECT {} FROM {} WHERE id = $1", status_col, table_name))
.bind(id)
.fetch_one(&pool)
.await
.unwrap_or_else(|_| "error".to_string());
if current_status != "queued" {
tracing::info!("Task {} status changed to {} during sleep. Aborting retry.", id, current_status);
return Ok(());
}
// Set back to processing to "lock" it again for this attempt
let rows_affected = sqlx::query(&format!("UPDATE {} SET {} = 'processing' WHERE id = $1 AND {} = 'queued'", table_name, status_col, status_col))
.bind(id)
.execute(&pool)
.await
.map_err(|e| e.to_string())?
.rows_affected();
if rows_affected == 0 {
return Ok(());
}
}
}
}
}
let result: serde_json::Value = response.json().await
.map_err(|e| format!("Failed to parse video bridge response: {}", e))?;
let bridge_content_url = result["url"].as_str()
.ok_or_else(|| "Video bridge response missing URL".to_string())?;
async fn process_image_download(
client: &reqwest::Client,
bridge_content_url: &str,
pool: &PgPool,
id: Uuid,
is_course: bool,
user_id: Option<Uuid>,
final_prompt: &str
) -> Result<(), String> {
// --- Download image and store as Asset ---
@@ -1523,7 +1603,7 @@ pub async fn run_image_generation_task(
"SELECT organization_id, id as course_id, instructor_id FROM courses WHERE id = $1"
)
.bind(id)
.fetch_one(&pool)
.fetch_one(pool)
.await
.map_err(|e| format!("Failed to fetch course context: {}", e))?;
c
@@ -1536,7 +1616,7 @@ pub async fn run_image_generation_task(
WHERE l.id = $1"
)
.bind(id)
.fetch_one(&pool)
.fetch_one(pool)
.await
.map_err(|e| format!("Failed to fetch lesson context: {}", e))?
};
@@ -1569,27 +1649,27 @@ pub async fn run_image_generation_task(
.bind(&storage_path)
.bind("image/png")
.bind(size_bytes)
.execute(&pool)
.execute(pool)
.await
.map_err(|e| format!("Failed to register asset: {}", e))?;
// 3. Complete task updating entity with local URL - ONLY if not cancelled (idle)
// 5. Complete task updating entity with local URL - ONLY if not cancelled (idle)
if is_course {
sqlx::query(
sqlx::query(&format!(
"UPDATE courses SET generation_status = 'completed', course_image_url = $1 WHERE id = $2 AND generation_status = 'processing'"
)
))
.bind(local_url)
.bind(id)
.execute(&pool)
.execute(pool)
.await
.map_err(|e| format!("Failed to complete course task: {}", e))?;
} else {
sqlx::query(
sqlx::query(&format!(
"UPDATE lessons SET video_generation_status = 'completed', content_url = $1, content_type = 'image' WHERE id = $2 AND video_generation_status = 'processing'"
)
))
.bind(local_url)
.bind(id)
.execute(&pool)
.execute(pool)
.await
.map_err(|e| format!("Failed to complete lesson task: {}", e))?;
}