Add docker-compose.local.yml for local development setup
- Introduced a new docker-compose.local.yml file to facilitate local development. - Disabled nginx-proxy and acme-companion services for local use. - Exposed database and application ports directly to the host for easier access. - Configured PostgreSQL to be accessible on localhost:5433. - Mapped application ports for studio and experience services.
This commit is contained in:
@@ -21,6 +21,7 @@ use std::env;
|
||||
use std::path::Path as StdPath;
|
||||
use tokio::process::Command;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct AssetUploadResponse {
|
||||
@@ -161,10 +162,28 @@ async fn maybe_push_local_file_to_s3(
|
||||
let client = build_s3_client(&settings).await?;
|
||||
let key = build_s3_object_key(org_id, course_id, storage_filename);
|
||||
|
||||
let (storage_path, public_url) = push_bytes_to_s3(
|
||||
&client,
|
||||
&settings,
|
||||
&key,
|
||||
mimetype,
|
||||
bytes,
|
||||
)
|
||||
.await?;
|
||||
Ok(Some((storage_path, public_url)))
|
||||
}
|
||||
|
||||
async fn push_bytes_to_s3(
|
||||
client: &S3Client,
|
||||
settings: &S3Settings,
|
||||
key: &str,
|
||||
mimetype: &str,
|
||||
bytes: Vec<u8>,
|
||||
) -> Result<(String, String), (StatusCode, String)> {
|
||||
client
|
||||
.put_object()
|
||||
.bucket(&settings.bucket)
|
||||
.key(&key)
|
||||
.key(key)
|
||||
.content_type(mimetype)
|
||||
.body(bytes.into())
|
||||
.send()
|
||||
@@ -172,8 +191,8 @@ async fn maybe_push_local_file_to_s3(
|
||||
.map_err(|e| (StatusCode::BAD_GATEWAY, format!("S3 upload failed: {}", e)))?;
|
||||
|
||||
let storage_path = format!("s3://{}/{}", settings.bucket, key);
|
||||
let public_url = build_s3_public_url(&settings, &key);
|
||||
Ok(Some((storage_path, public_url)))
|
||||
let public_url = build_s3_public_url(settings, key);
|
||||
Ok((storage_path, public_url))
|
||||
}
|
||||
|
||||
async fn delete_storage_path(storage_path: &str) -> Result<(), (StatusCode, String)> {
|
||||
@@ -637,6 +656,149 @@ struct ZipEntryData {
|
||||
is_flv: bool,
|
||||
}
|
||||
|
||||
async fn process_zip_entry_without_rag(
|
||||
entry: ZipEntryData,
|
||||
org_id: Uuid,
|
||||
user_id: Uuid,
|
||||
pool: PgPool,
|
||||
course_id: Option<Uuid>,
|
||||
english_level: Option<String>,
|
||||
sam_plan_id: Option<i32>,
|
||||
sam_course_id: Option<i32>,
|
||||
split_midpoint: Option<i32>,
|
||||
sam_course_id_r1: Option<i32>,
|
||||
sam_course_id_r2: Option<i32>,
|
||||
s3_settings: Option<S3Settings>,
|
||||
s3_client: Option<S3Client>,
|
||||
) -> Result<(), String> {
|
||||
let ZipEntryData {
|
||||
entry_name,
|
||||
safe_filename,
|
||||
content,
|
||||
unit_number,
|
||||
guessed_mimetype,
|
||||
is_flv,
|
||||
..
|
||||
} = entry;
|
||||
|
||||
let effective_sam_course_id = match (split_midpoint, unit_number) {
|
||||
(Some(mid), Some(u)) => {
|
||||
if u <= mid { sam_course_id_r1 } else { sam_course_id_r2 }
|
||||
}
|
||||
_ => sam_course_id,
|
||||
};
|
||||
|
||||
let asset_id = Uuid::new_v4();
|
||||
|
||||
let (storage_path, stored_filename, mimetype) = if is_flv {
|
||||
let temp_storage_filename = format!("{}.flv", asset_id);
|
||||
let temp_storage_path = format!("uploads/{}", temp_storage_filename);
|
||||
tokio::fs::write(&temp_storage_path, &content)
|
||||
.await
|
||||
.map_err(|e| format!("{}: local write failed ({})", entry_name, e))?;
|
||||
|
||||
let final_storage_filename = format!("{}.mp4", asset_id);
|
||||
let final_storage_path = format!("uploads/{}", final_storage_filename);
|
||||
if let Err((_, msg)) = transcode_flv_to_mp4(&temp_storage_path, &final_storage_path).await {
|
||||
let _ = tokio::fs::remove_file(&temp_storage_path).await;
|
||||
return Err(format!("{}: flv transcode failed ({})", entry_name, msg));
|
||||
}
|
||||
let _ = tokio::fs::remove_file(&temp_storage_path).await;
|
||||
|
||||
(
|
||||
final_storage_path,
|
||||
replace_extension(&safe_filename, "mp4"),
|
||||
"video/mp4".to_string(),
|
||||
)
|
||||
} else {
|
||||
let extension = StdPath::new(&safe_filename)
|
||||
.extension()
|
||||
.and_then(|s| s.to_str())
|
||||
.unwrap_or("");
|
||||
|
||||
let storage_filename = if extension.is_empty() {
|
||||
asset_id.to_string()
|
||||
} else {
|
||||
format!("{}.{}", asset_id, extension)
|
||||
};
|
||||
let storage_path = format!("uploads/{}", storage_filename);
|
||||
|
||||
(storage_path, safe_filename.clone(), guessed_mimetype)
|
||||
};
|
||||
|
||||
let storage_filename_for_s3 = StdPath::new(&storage_path)
|
||||
.file_name()
|
||||
.and_then(|s| s.to_str())
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
|
||||
let (db_storage_path, persisted_size, _asset_public_url) = if !storage_filename_for_s3.is_empty() {
|
||||
if let (Some(settings), Some(client)) = (s3_settings.as_ref(), s3_client.as_ref()) {
|
||||
let key = build_s3_object_key(org_id, course_id, &storage_filename_for_s3);
|
||||
let upload_bytes = if is_flv {
|
||||
tokio::fs::read(&storage_path)
|
||||
.await
|
||||
.map_err(|e| format!("{}: local read failed ({})", entry_name, e))?
|
||||
} else {
|
||||
content
|
||||
};
|
||||
let uploaded_len = upload_bytes.len() as i64;
|
||||
let (s3_path, public_url) = push_bytes_to_s3(client, settings, &key, &mimetype, upload_bytes)
|
||||
.await
|
||||
.map_err(|(_, msg)| format!("{}: s3 upload failed ({})", entry_name, msg))?;
|
||||
|
||||
if is_flv {
|
||||
let _ = tokio::fs::remove_file(&storage_path).await;
|
||||
}
|
||||
|
||||
(s3_path, uploaded_len, public_url)
|
||||
} else {
|
||||
if !is_flv {
|
||||
tokio::fs::write(&storage_path, &content)
|
||||
.await
|
||||
.map_err(|e| format!("{}: local write failed ({})", entry_name, e))?;
|
||||
}
|
||||
|
||||
let size = tokio::fs::metadata(&storage_path)
|
||||
.await
|
||||
.map(|m| m.len() as i64)
|
||||
.unwrap_or(content.len() as i64);
|
||||
|
||||
(
|
||||
storage_path.clone(),
|
||||
size,
|
||||
format!("/assets/{}", storage_filename_for_s3),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
(storage_path.clone(), content.len() as i64, storage_path.clone())
|
||||
};
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO assets (id, organization_id, uploaded_by, course_id, english_level, sam_plan_id, sam_course_id, unit_number, filename, storage_path, mimetype, size_bytes)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
|
||||
"#,
|
||||
)
|
||||
.bind(asset_id)
|
||||
.bind(org_id)
|
||||
.bind(user_id)
|
||||
.bind(course_id)
|
||||
.bind(&english_level)
|
||||
.bind(sam_plan_id)
|
||||
.bind(effective_sam_course_id)
|
||||
.bind(unit_number)
|
||||
.bind(&stored_filename)
|
||||
.bind(&db_storage_path)
|
||||
.bind(&mimetype)
|
||||
.bind(persisted_size)
|
||||
.execute(&pool)
|
||||
.await
|
||||
.map_err(|e| format!("{}: db insert failed ({})", entry_name, e))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn import_assets_zip(
|
||||
Org(org_ctx): Org,
|
||||
claims: Claims,
|
||||
@@ -824,6 +986,17 @@ pub async fn import_assets_zip(
|
||||
// Sort: audio/video first so their asset IDs are known when text is ingested
|
||||
all_entries.sort_by_key(|e| if e.is_audio_video { 0usize } else { 1 });
|
||||
|
||||
let s3_settings = get_s3_settings();
|
||||
let s3_client = if let Some(settings) = &s3_settings {
|
||||
Some(build_s3_client(settings).await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
tokio::fs::create_dir_all("uploads")
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
|
||||
// ── Phase 2: process entries ───────────────────────────────────────────────
|
||||
let mut imported_assets = 0usize;
|
||||
let mut rag_ingested_assets = 0usize;
|
||||
@@ -847,6 +1020,70 @@ pub async fn import_assets_zip(
|
||||
let ollama_url = ai::get_ollama_url();
|
||||
let model = ai::get_embedding_model();
|
||||
|
||||
if !ingest_rag {
|
||||
let org_id = org_ctx.id;
|
||||
let user_id = claims.sub;
|
||||
let concurrency = env::var("ZIP_IMPORT_CONCURRENCY")
|
||||
.ok()
|
||||
.and_then(|v| v.parse::<usize>().ok())
|
||||
.map(|v| v.clamp(1, 16))
|
||||
.unwrap_or(4);
|
||||
|
||||
let mut join_set: JoinSet<Result<(), String>> = JoinSet::new();
|
||||
|
||||
for entry in all_entries {
|
||||
while join_set.len() >= concurrency {
|
||||
match join_set.join_next().await {
|
||||
Some(Ok(Ok(()))) => imported_assets += 1,
|
||||
Some(Ok(Err(msg))) => failed_entries.push(msg),
|
||||
Some(Err(e)) => failed_entries.push(format!("zip worker failed: {}", e)),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
|
||||
let pool_cl = pool.clone();
|
||||
let english_level_cl = english_level.clone();
|
||||
let s3_settings_cl = s3_settings.clone();
|
||||
let s3_client_cl = s3_client.clone();
|
||||
|
||||
join_set.spawn(async move {
|
||||
process_zip_entry_without_rag(
|
||||
entry,
|
||||
org_id,
|
||||
user_id,
|
||||
pool_cl,
|
||||
course_id,
|
||||
english_level_cl,
|
||||
sam_plan_id,
|
||||
sam_course_id,
|
||||
split_midpoint,
|
||||
sam_course_id_r1,
|
||||
sam_course_id_r2,
|
||||
s3_settings_cl,
|
||||
s3_client_cl,
|
||||
)
|
||||
.await
|
||||
});
|
||||
}
|
||||
|
||||
while let Some(result) = join_set.join_next().await {
|
||||
match result {
|
||||
Ok(Ok(())) => imported_assets += 1,
|
||||
Ok(Err(msg)) => failed_entries.push(msg),
|
||||
Err(e) => failed_entries.push(format!("zip worker failed: {}", e)),
|
||||
}
|
||||
}
|
||||
|
||||
let _ = tokio::fs::remove_file(&zip_path).await;
|
||||
|
||||
return Ok(Json(AssetZipImportResponse {
|
||||
imported_assets,
|
||||
rag_ingested_assets: 0,
|
||||
rag_chunks_ingested: 0,
|
||||
failed_entries,
|
||||
}));
|
||||
}
|
||||
|
||||
for entry in all_entries {
|
||||
let ZipEntryData {
|
||||
entry_name,
|
||||
@@ -871,9 +1108,6 @@ pub async fn import_assets_zip(
|
||||
let (storage_path, stored_filename, mimetype) = if is_flv {
|
||||
let temp_storage_filename = format!("{}.flv", asset_id);
|
||||
let temp_storage_path = format!("uploads/{}", temp_storage_filename);
|
||||
tokio::fs::create_dir_all("uploads")
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
tokio::fs::write(&temp_storage_path, &content)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
@@ -901,13 +1135,6 @@ pub async fn import_assets_zip(
|
||||
};
|
||||
let storage_path = format!("uploads/{}", storage_filename);
|
||||
|
||||
tokio::fs::create_dir_all("uploads")
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
tokio::fs::write(&storage_path, &content)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
||||
|
||||
(storage_path, safe_filename.clone(), guessed_mimetype)
|
||||
};
|
||||
|
||||
@@ -918,18 +1145,44 @@ pub async fn import_assets_zip(
|
||||
.to_string();
|
||||
|
||||
let (db_storage_path, asset_public_url) = if !storage_filename_for_s3.is_empty() {
|
||||
if let Some((s3_path, public_url)) = maybe_push_local_file_to_s3(
|
||||
&storage_path,
|
||||
&storage_filename_for_s3,
|
||||
&mimetype,
|
||||
org_ctx.id,
|
||||
course_id,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
let _ = tokio::fs::remove_file(&storage_path).await;
|
||||
(s3_path, public_url)
|
||||
if let (Some(settings), Some(client)) = (s3_settings.as_ref(), s3_client.as_ref()) {
|
||||
let key = build_s3_object_key(org_ctx.id, course_id, &storage_filename_for_s3);
|
||||
let upload_bytes = if is_flv {
|
||||
match tokio::fs::read(&storage_path).await {
|
||||
Ok(bytes) => bytes,
|
||||
Err(e) => {
|
||||
failed_entries.push(format!("{}: local read failed ({})", entry_name, e));
|
||||
let _ = tokio::fs::remove_file(&storage_path).await;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
content.clone()
|
||||
};
|
||||
|
||||
match push_bytes_to_s3(client, settings, &key, &mimetype, upload_bytes).await {
|
||||
Ok((s3_path, public_url)) => {
|
||||
if is_flv {
|
||||
let _ = tokio::fs::remove_file(&storage_path).await;
|
||||
}
|
||||
(s3_path, public_url)
|
||||
}
|
||||
Err((_, msg)) => {
|
||||
if is_flv {
|
||||
let _ = tokio::fs::remove_file(&storage_path).await;
|
||||
}
|
||||
failed_entries.push(format!("{}: s3 upload failed ({})", entry_name, msg));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if !is_flv {
|
||||
if let Err(e) = tokio::fs::write(&storage_path, &content).await {
|
||||
failed_entries.push(format!("{}: local write failed ({})", entry_name, e));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
(
|
||||
storage_path.clone(),
|
||||
format!("/assets/{}", storage_filename_for_s3),
|
||||
|
||||
Reference in New Issue
Block a user