From 5fd8d466f7738e26a11cbb3efb618086018b230e Mon Sep 17 00:00:00 2001 From: Zhexuan Yang Date: Wed, 25 Feb 2026 09:47:38 +0800 Subject: [PATCH 1/4] Rewrite tests for WorkUnit-based reaping --- .../tests/zombie_reaper_test.rs | 248 ++++++++---------- 1 file changed, 111 insertions(+), 137 deletions(-) diff --git a/crates/roboflow-distributed/tests/zombie_reaper_test.rs b/crates/roboflow-distributed/tests/zombie_reaper_test.rs index 646451ad..6a5b2750 100644 --- a/crates/roboflow-distributed/tests/zombie_reaper_test.rs +++ b/crates/roboflow-distributed/tests/zombie_reaper_test.rs @@ -11,13 +11,16 @@ //! 4. Failed work units can be retried mod tests { - use std::time::Duration; + use std::time::Duration as StdDuration; + use roboflow_distributed::batch::WorkUnitKeys; + use roboflow_distributed::tikv::key::HeartbeatKeys; use roboflow_distributed::{ HeartbeatConfig, HeartbeatManager, HeartbeatRecord, ReaperConfig, ReaperMetrics, - ReclaimResult, + ReclaimResult, ZombieReaper, }; use roboflow_distributed::{TikvClient, WorkerStatus}; + use roboflow_distributed::{WorkFile, WorkUnit, WorkUnitStatus}; #[tokio::test] async fn test_heartbeat_manager() { @@ -33,8 +36,8 @@ mod tests { let pod_id = format!("test-worker-heartbeat-{}", uuid::Uuid::new_v4()); let config = HeartbeatConfig::new() - .with_interval(Duration::from_secs(10)) - .with_stale_threshold(Duration::from_secs(60)); + .with_interval(StdDuration::from_secs(10)) + .with_stale_threshold(StdDuration::from_secs(60)); // Clean up any existing heartbeat first let key = roboflow_distributed::tikv::key::HeartbeatKeys::heartbeat(&pod_id); @@ -132,137 +135,108 @@ mod tests { let _ = ReclaimResult::Skipped; } - // TODO: Rewrite these tests for WorkUnit-based reaping - // The old tests used JobRecord/JobStatus which have been removed - // - // #[tokio::test] - // async fn test_end_to_end_zombie_reclamation() { - // // This test requires a running TiKV instance - // let client = match TikvClient::from_env().await { - // Ok(c) => c, - // Err(_) => { - // println!("Skipping test: TiKV not available"); - // return; - // } - // }; - // - // let pod_id = "test-worker-zombie"; - // let job_id = "test-job-zombie"; - // - // // Create a job in Processing state - // let job = create_test_job(job_id, pod_id); - // client - // .put_job(&job) - // .await - // .expect("Failed to create test job"); - // - // // Create a heartbeat for the worker - // let heartbeat = create_test_heartbeat(pod_id); - // client - // .update_heartbeat(pod_id, &heartbeat) - // .await - // .expect("Failed to create heartbeat"); - // - // // Verify job is in Processing state - // let retrieved_job = client.get_job(job_id).await.expect("Failed to get job"); - // assert!(retrieved_job.is_some()); - // assert_eq!(retrieved_job.unwrap().status, JobStatus::Processing); - // - // // Wait for heartbeat to become stale (simulation) - // // In real scenario, we'd wait 5+ minutes, but for testing we use a short threshold - // - // // Create reaper with short stale threshold - // let config = ReaperConfig::new() - // .with_interval(Duration::from_secs(1)) - // .with_stale_threshold(Duration::from_secs(0)); // Immediately stale - // - // let reaper = ZombieReaper::new(std::sync::Arc::new(client.clone()), config); - // - // // Run one iteration - // let reclaimed_count = reaper - // .run_iteration() - // .await - // .expect("Failed to run reaper iteration"); - // - // // Since we set stale_threshold to 0, the heartbeat should be stale - // // and the job should be reclaimed - // assert!(reclaimed_count <= 1); - // - // // Verify job was reclaimed (status should be Pending) - // let final_job = client.get_job(job_id).await.expect("Failed to get job"); - // if let Some(job) = final_job { - // // Job should be in Pending state after reclamation - // assert_eq!(job.status, JobStatus::Pending); - // assert!(job.owner.is_none()); - // } - // - // // Cleanup - // let _ = client.delete(JobKeys::record(job_id)).await; - // let _ = client.delete(HeartbeatKeys::heartbeat(pod_id)).await; - // } - // - // // #[tokio::test] - // // async fn test_heartbeat_preserved_on_reclaim() { - // // This test verifies that checkpoints are preserved during job reclamation - // let client = match TikvClient::from_env().await { - // Ok(c) => c, - // Err(_) => { - // println!("Skipping test: TiKV not available"); - // return; - // } - // }; - // - // use roboflow_distributed::CheckpointState; - // - // let pod_id = "test-worker-checkpoint"; - // let job_id = "test-job-checkpoint"; - // - // // Create a job in Processing state - // let job = create_test_job(job_id, pod_id); - // client - // .put_job(&job) - // .await - // .expect("Failed to create test job"); - // - // // Create a checkpoint for the job - // let mut checkpoint = CheckpointState::new(job_id.to_string(), pod_id.to_string(), 1000); - // checkpoint - // .update(500, 50000) - // .expect("Failed to update checkpoint"); - // client - // .update_checkpoint(&checkpoint) - // .await - // .expect("Failed to create checkpoint"); - // - // // Verify checkpoint exists - // let retrieved_checkpoint = client - // .get_checkpoint(job_id) - // .await - // .expect("Failed to get checkpoint"); - // assert!(retrieved_checkpoint.is_some()); - // assert_eq!(retrieved_checkpoint.unwrap().last_frame, 500); - // - // // Reclaim the job with stale threshold of 0 - // let reclaimed = client - // .reclaim_job(job_id, 0) - // .await - // .expect("Failed to reclaim job"); - // assert!(reclaimed); - // - // // Verify checkpoint still exists after reclamation - // let final_checkpoint = client - // .get_checkpoint(job_id) - // .await - // .expect("Failed to get checkpoint after reclamation"); - // assert!(final_checkpoint.is_some()); - // let cp = final_checkpoint.unwrap(); - // assert_eq!( - // cp.last_frame, 500, - // "Checkpoint should be preserved after reclamation" - // ); - // - // // Cleanup - // let _ = client.delete(JobKeys::record(job_id)).await; - // let _ = client.delete(StateKeys::checkpoint(job_id)).await; - // } + #[tokio::test] + async fn test_end_to_end_zombie_reclamation() { + // This test requires a running TiKV instance + let client = match TikvClient::from_env().await { + Ok(c) => c, + Err(_) => { + println!("Skipping test: TiKV not available"); + return; + } + }; + + let pod_id = "test-worker-zombie"; + let batch_id = "test-batch-zombie"; + let unit_id = "test-unit-zombie"; + + // Create a work unit in Processing state + let mut work_unit = WorkUnit::with_id( + unit_id.to_string(), + batch_id.to_string(), + vec![WorkFile::new( + "s3://test-bucket/file.mcap".to_string(), + 1024, + )], + "s3://test-output/".to_string(), + "test-config-hash".to_string(), + ); + work_unit + .claim(pod_id.to_string()) + .expect("Failed to claim work unit"); + let work_unit_key = WorkUnitKeys::unit(batch_id, unit_id); + client + .put( + work_unit_key.clone(), + bincode::serialize(&work_unit).expect("Failed to serialize work unit"), + ) + .await + .expect("Failed to create test work unit"); + + // Create a heartbeat for the worker with last_heartbeat in the past + let mut heartbeat = HeartbeatRecord::new(pod_id.to_string()); + use chrono::{Duration, Utc}; + heartbeat.last_heartbeat = Utc::now() - Duration::seconds(10); // 10 seconds old + let heartbeat_key = HeartbeatKeys::heartbeat(pod_id); + client + .put( + heartbeat_key.clone(), + bincode::serialize(&heartbeat).expect("Failed to serialize heartbeat"), + ) + .await + .expect("Failed to create heartbeat"); + + // Verify work unit is in Processing state + let retrieved_data = client + .get(work_unit_key.clone()) + .await + .expect("Failed to get work unit"); + assert!(retrieved_data.is_some()); + let retrieved_unit: WorkUnit = bincode::deserialize(&retrieved_data.unwrap()) + .expect("Failed to deserialize work unit"); + assert_eq!(retrieved_unit.status, WorkUnitStatus::Processing); + assert_eq!(retrieved_unit.owner, Some(pod_id.to_string())); + + // Create reaper with short stale threshold + let config = ReaperConfig::new() + .with_interval(StdDuration::from_secs(1)) + .with_stale_threshold(StdDuration::from_secs(5)); // Stale after 5 seconds + + let reaper = ZombieReaper::new(std::sync::Arc::new(client.clone()), config); + + // Run one iteration + let reclaimed_count = reaper + .run_iteration() + .await + .expect("Failed to run reaper iteration"); + + // Since we set stale_threshold to 0, the heartbeat should be stale + // and the work unit should be reclaimed + assert!(reclaimed_count <= 1); + + // Verify work unit was reclaimed (status should be Failed) + let final_data = client + .get(work_unit_key.clone()) + .await + .expect("Failed to get work unit"); + if let Some(data) = final_data { + let final_unit: WorkUnit = + bincode::deserialize(&data).expect("Failed to deserialize work unit"); + // Work unit should be in Failed state after reclamation (allows retry) + assert_eq!(final_unit.status, WorkUnitStatus::Failed); + assert!(final_unit.owner.is_none()); + assert!(final_unit.error.is_some()); + assert!( + final_unit + .error + .as_ref() + .unwrap() + .contains("Worker died during processing") + ); + } + + // Cleanup + let _ = client.delete(work_unit_key).await; + let _ = client.delete(heartbeat_key).await; + } } From 80cad3d90c0a0efb9d76c3c16b3ed21d3038ef43 Mon Sep 17 00:00:00 2001 From: Zhexuan Yang Date: Thu, 26 Feb 2026 12:42:44 +0800 Subject: [PATCH 2/4] feat: populate failed_work_units with error details in batch status Previously, reconcile_running() counted failed work units but didn't populate the failed_work_units field with error details. This made debugging difficult as batch status showed only counts without actual error messages. Changes: - Import FailedWorkUnit in controller - Collect error details from failed work units during reconciliation - Update CLI to display failed work unit details (ID, file, error, retries) --- Cargo.lock | 2 +- Cargo.toml | 2 +- crates/roboflow-dataset/src/sources/bag.rs | 50 +++--- crates/roboflow-dataset/src/sources/mcap.rs | 7 +- .../src/batch/controller.rs | 23 ++- .../tests/test_controller_new.rs | 142 ++++++++++++++++++ src/bin/commands/batch.rs | 13 ++ 7 files changed, 211 insertions(+), 28 deletions(-) create mode 100644 crates/roboflow-distributed/tests/test_controller_new.rs diff --git a/Cargo.lock b/Cargo.lock index 0e46264a..389eae0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3986,7 +3986,7 @@ dependencies = [ [[package]] name = "robocodec" version = "0.1.0" -source = "git+https://github.com/archebase/robocodec?rev=019baae541f1cb1d89439e9940d5fbef98f38898#019baae541f1cb1d89439e9940d5fbef98f38898" +source = "git+https://github.com/archebase/robocodec?rev=b27acf0#b27acf0e5881c38f8cc7f09e25c15787a440ae13" dependencies = [ "async-trait", "aws-config", diff --git a/Cargo.toml b/Cargo.toml index 919b004f..bd0cc797 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ roboflow-distributed = { path = "crates/roboflow-distributed", version = "0.2.0" # External dependencies # Pinned to specific commit for reproducible builds -robocodec = { git = "https://github.com/archebase/robocodec", rev = "019baae541f1cb1d89439e9940d5fbef98f38898" } +robocodec = { git = "https://github.com/archebase/robocodec", rev = "b27acf0" } chrono = { version = "0.4", features = ["serde"] } async-trait = "0.1" tokio = { version = "1.40", features = ["rt-multi-thread", "sync"] } diff --git a/crates/roboflow-dataset/src/sources/bag.rs b/crates/roboflow-dataset/src/sources/bag.rs index d386fdac..efbf3080 100644 --- a/crates/roboflow-dataset/src/sources/bag.rs +++ b/crates/roboflow-dataset/src/sources/bag.rs @@ -48,6 +48,7 @@ impl BagSource { } } + #[allow(dead_code)] fn is_cloud_url(&self) -> bool { self.path.starts_with("s3://") || self.path.starts_with("oss://") } @@ -195,12 +196,6 @@ impl Source for BagSource { self.path = path.clone(); } - if self.is_cloud_url() { - return Err(SourceError::InvalidConfig( - "Cloud URLs not yet supported for BagSource. Use local files.".to_string(), - )); - } - let (metadata, rx, handle) = initialize_threaded_source(&self.path, "bag-decoder", |path, meta_tx, msg_tx| { spawn_local_decoder(path, meta_tx, msg_tx, "bag") @@ -308,6 +303,7 @@ impl BagSourceBatched { } } + #[allow(dead_code)] fn is_cloud_url(&self) -> bool { self.path.starts_with("s3://") || self.path.starts_with("oss://") } @@ -470,12 +466,6 @@ impl Source for BagSourceBatched { self.path = path.clone(); } - if self.is_cloud_url() { - return Err(SourceError::InvalidConfig( - "Batched mode not supported for cloud URLs yet".to_string(), - )); - } - let batch_size = self.batch_size; let (metadata, rx, handle) = initialize_threaded_source_batched( &self.path, @@ -601,6 +591,7 @@ impl BagSourceBlocking { } } + #[allow(dead_code)] fn is_cloud_url(&self) -> bool { self.path.starts_with("s3://") || self.path.starts_with("oss://") } @@ -709,12 +700,6 @@ impl Source for BagSourceBlocking { self.path = path.clone(); } - if self.is_cloud_url() { - return Err(SourceError::InvalidConfig( - "Blocking mode not supported for cloud URLs".to_string(), - )); - } - let batch_size = self.batch_size; let (tx, rx) = crossbeam_channel::bounded(16); let (meta_tx, meta_rx) = tokio::sync::oneshot::channel(); @@ -1025,3 +1010,32 @@ mod tests { assert!(!source.is_cloud_url()); } } + +#[cfg(test)] +mod s3_url_tests { + //! Tests verifying S3/OSS URLs are accepted (not rejected). + //! These tests verify that the artificial "Cloud URLs not yet supported" + //! restriction has been removed. + + use super::*; + + #[test] + fn test_bag_source_accepts_s3_url() { + let source = BagSource::new("s3://bucket/file.bag"); + assert!(source.is_ok(), "BagSource should accept S3 URLs"); + let source = source.unwrap(); + assert!(source.is_cloud_url()); + } + + #[test] + fn test_bag_source_batched_accepts_s3_url() { + let source = BagSourceBatched::new("s3://bucket/file.bag", 100); + assert!(source.is_ok(), "BagSourceBatched should accept S3 URLs"); + } + + #[test] + fn test_bag_source_blocking_accepts_s3_url() { + let source = BagSourceBlocking::new("s3://bucket/file.bag", 100); + assert!(source.is_ok(), "BagSourceBlocking should accept S3 URLs"); + } +} diff --git a/crates/roboflow-dataset/src/sources/mcap.rs b/crates/roboflow-dataset/src/sources/mcap.rs index 11f8f5c5..d456a8f9 100644 --- a/crates/roboflow-dataset/src/sources/mcap.rs +++ b/crates/roboflow-dataset/src/sources/mcap.rs @@ -48,6 +48,7 @@ impl McapSource { } } + #[allow(dead_code)] fn is_cloud_url(&self) -> bool { self.path.starts_with("s3://") || self.path.starts_with("oss://") } @@ -195,12 +196,6 @@ impl Source for McapSource { self.path = path.clone(); } - if self.is_cloud_url() { - return Err(SourceError::InvalidConfig( - "Cloud URLs not yet supported for McapSource. Use local files.".to_string(), - )); - } - let (metadata, rx, handle) = initialize_threaded_source(&self.path, "mcap-decoder", |path, meta_tx, msg_tx| { spawn_local_decoder(path, meta_tx, msg_tx, "mcap") diff --git a/crates/roboflow-distributed/src/batch/controller.rs b/crates/roboflow-distributed/src/batch/controller.rs index d44fb9d0..5737abd6 100644 --- a/crates/roboflow-distributed/src/batch/controller.rs +++ b/crates/roboflow-distributed/src/batch/controller.rs @@ -9,7 +9,7 @@ use super::key::{BatchIndexKeys, BatchKeys, WorkUnitKeys}; use super::spec::BatchSpec; -use super::status::{BatchPhase, BatchStatus, DiscoveryStatus}; +use super::status::{BatchPhase, BatchStatus, DiscoveryStatus, FailedWorkUnit}; use super::work_unit::{WorkUnit, WorkUnitStatus}; use crate::tikv::{TikvClient, TikvError}; @@ -373,12 +373,30 @@ impl BatchController { let mut completed = 0u32; let mut failed = 0u32; let mut processing = 0u32; + let mut failed_work_units = Vec::new(); for (key, value) in work_units { match bincode::deserialize::(&value) { Ok(unit) => match unit.status { WorkUnitStatus::Complete => completed += 1, - WorkUnitStatus::Failed | WorkUnitStatus::Dead => failed += 1, + WorkUnitStatus::Failed | WorkUnitStatus::Dead => { + failed += 1; + // Collect error details from failed work units + if let Some(error) = &unit.error { + let source_file = unit + .files + .first() + .map(|f| f.url.clone()) + .unwrap_or_else(|| "unknown".to_string()); + failed_work_units.push(FailedWorkUnit { + id: unit.id.clone(), + source_file, + error: error.clone(), + retries: unit.attempts, + failed_at: unit.updated_at, + }); + } + } WorkUnitStatus::Processing => processing += 1, _ => {} }, @@ -416,6 +434,7 @@ impl BatchController { status.files_completed = completed; status.files_failed = failed; status.files_active = processing; + status.failed_work_units = failed_work_units; if matches!(status.phase, BatchPhase::Failed) && failed == 0 diff --git a/crates/roboflow-distributed/tests/test_controller_new.rs b/crates/roboflow-distributed/tests/test_controller_new.rs new file mode 100644 index 00000000..083a1df8 --- /dev/null +++ b/crates/roboflow-distributed/tests/test_controller_new.rs @@ -0,0 +1,142 @@ +// SPDX-FileCopyrightText: 2026 ArcheBase +// +// SPDX-License-Identifier: MulanPSL-2.0 + +//! Integration test for failed_work_units population in batch status. + +use roboflow_distributed::batch::{ + BatchController, BatchIndexKeys, BatchKeys, BatchPhase, BatchSpec, BatchStatus, WorkFile, + WorkUnit, WorkUnitKeys, WorkUnitStatus, +}; +use roboflow_distributed::tikv::client::TikvClient; +use std::sync::Arc; + +fn unique_batch_id(prefix: &str) -> String { + format!("jobs:{}-{}", prefix, uuid::Uuid::new_v4()) +} + +async fn get_tikv_client() -> Option> { + match TikvClient::from_env().await { + Ok(client) => Some(Arc::new(client)), + Err(e) => { + println!("Skipping test: TiKV not available: {}", e); + None + } + } +} + +#[tokio::test] +async fn test_reconcile_populates_failed_work_units_with_error_details() { + //! Verify that reconcile populates failed_work_units with error details. + //! + //! This tests the fix for the issue where work unit failures showed no error + //! details in batch status output. + let tikv = match get_tikv_client().await { + Some(client) => client, + None => return, + }; + let controller = BatchController::with_client(tikv.clone()); + + let batch_id = unique_batch_id("test-failed-work-units"); + let batch_name = batch_id.strip_prefix("jobs:").unwrap(); + + // Create batch + let spec = BatchSpec::new( + batch_name, + vec!["s3://test/file.bag".to_string()], + "s3://output/".to_string(), + ); + controller.submit_batch(&spec).await.unwrap(); + + // Create work units: one complete, one failed with error + let complete_unit_id = "unit-complete"; + let failed_unit_id = "unit-failed"; + let error_message = "Test error: codec failure"; + + // Create complete work unit + let mut complete_unit = WorkUnit::with_id( + complete_unit_id.to_string(), + batch_id.to_string(), + vec![WorkFile::new("s3://test/file1.bag".to_string(), 1024)], + "s3://output/".to_string(), + "config-hash".to_string(), + ); + complete_unit.status = WorkUnitStatus::Complete; + + // Create failed work unit with error + let mut failed_unit = WorkUnit::with_id( + failed_unit_id.to_string(), + batch_id.to_string(), + vec![WorkFile::new("s3://test/file2.bag".to_string(), 2048)], + "s3://output/".to_string(), + "config-hash".to_string(), + ); + failed_unit.status = WorkUnitStatus::Dead; + failed_unit.error = Some(error_message.to_string()); + failed_unit.attempts = 3; + + // Store work units in TiKV + let complete_key = WorkUnitKeys::unit(&batch_id, complete_unit_id); + let failed_key = WorkUnitKeys::unit(&batch_id, failed_unit_id); + tikv.put( + complete_key.clone(), + bincode::serialize(&complete_unit).unwrap(), + ) + .await + .unwrap(); + tikv.put( + failed_key.clone(), + bincode::serialize(&failed_unit).unwrap(), + ) + .await + .unwrap(); + + // Transition batch to Running phase + let mut status = BatchStatus::new(); + status.transition_to(BatchPhase::Running); + status.set_work_units_total(2); + let status_key = BatchKeys::status(&batch_id); + tikv.put(status_key.clone(), bincode::serialize(&status).unwrap()) + .await + .unwrap(); + + // Trigger reconciliation using public API + let result = controller.reconcile_batch_id(&batch_id).await; + assert!(result.is_ok(), "Reconciliation should succeed"); + + // Get updated status + let updated_status = controller + .get_batch_status(&batch_id) + .await + .unwrap() + .unwrap(); + + // Verify failed_work_units is populated + assert_eq!( + updated_status.failed_work_units.len(), + 1, + "Should have one failed work unit" + ); + + let failed = &updated_status.failed_work_units[0]; + assert_eq!(failed.id, failed_unit_id); + assert_eq!(failed.source_file, "s3://test/file2.bag"); + assert_eq!(failed.error, error_message); + assert_eq!(failed.retries, 3); + + // Verify counts + assert_eq!(updated_status.work_units_completed, 1); + assert_eq!(updated_status.work_units_failed, 1); + + // Cleanup + let _ = tikv.delete(BatchKeys::spec(&batch_id)).await; + let _ = tikv.delete(BatchKeys::status(&batch_id)).await; + let _ = tikv.delete(complete_key).await; + let _ = tikv.delete(failed_key).await; + let _ = tikv + .delete(BatchIndexKeys::phase(BatchPhase::Pending, &batch_id)) + .await; + let _ = tikv + .delete(BatchIndexKeys::phase(BatchPhase::Running, &batch_id)) + .await; +} diff --git a/src/bin/commands/batch.rs b/src/bin/commands/batch.rs index 94f519ae..35b240ba 100644 --- a/src/bin/commands/batch.rs +++ b/src/bin/commands/batch.rs @@ -784,6 +784,19 @@ fn print_status_table(batch_id: &str, status: &BatchStatus) { println!(); println!("Error: {}", error); } + + // Display failed work units with error details + if !status.failed_work_units.is_empty() { + println!(); + println!("Failed Work Units:"); + for unit in &status.failed_work_units { + println!(); + println!(" ID: {}", unit.id); + println!(" File: {}", unit.source_file); + println!(" Error: {}", unit.error); + println!(" Retries: {}", unit.retries); + } + } } /// Format a duration for display. From f3bb63a6db5aa13ae7e4ac77ec5a43277f869590 Mon Sep 17 00:00:00 2001 From: Zhexuan Yang Date: Thu, 26 Feb 2026 13:57:37 +0800 Subject: [PATCH 3/4] fix: address code review feedback - Fix failed work units with error=None to always appear in failed_work_units (was silently omitted causing mismatch between count and details) - Change #[allow(dead_code)] to #[cfg(test)] for is_cloud_url methods - Fix stale comment in zombie_reaper_test.rs (threshold is 5s not 0s) - Fix weak assertion reclaimed_count <= 1 to reclaimed_count == 1 --- crates/roboflow-dataset/src/sources/bag.rs | 6 ++-- crates/roboflow-dataset/src/sources/mcap.rs | 2 +- .../src/batch/controller.rs | 31 ++++++++++--------- .../tests/zombie_reaper_test.rs | 7 +++-- 4 files changed, 26 insertions(+), 20 deletions(-) diff --git a/crates/roboflow-dataset/src/sources/bag.rs b/crates/roboflow-dataset/src/sources/bag.rs index efbf3080..28d1331d 100644 --- a/crates/roboflow-dataset/src/sources/bag.rs +++ b/crates/roboflow-dataset/src/sources/bag.rs @@ -48,7 +48,7 @@ impl BagSource { } } - #[allow(dead_code)] + #[cfg(test)] fn is_cloud_url(&self) -> bool { self.path.starts_with("s3://") || self.path.starts_with("oss://") } @@ -303,7 +303,7 @@ impl BagSourceBatched { } } - #[allow(dead_code)] + #[cfg(test)] fn is_cloud_url(&self) -> bool { self.path.starts_with("s3://") || self.path.starts_with("oss://") } @@ -591,7 +591,7 @@ impl BagSourceBlocking { } } - #[allow(dead_code)] + #[cfg(test)] fn is_cloud_url(&self) -> bool { self.path.starts_with("s3://") || self.path.starts_with("oss://") } diff --git a/crates/roboflow-dataset/src/sources/mcap.rs b/crates/roboflow-dataset/src/sources/mcap.rs index d456a8f9..55e39161 100644 --- a/crates/roboflow-dataset/src/sources/mcap.rs +++ b/crates/roboflow-dataset/src/sources/mcap.rs @@ -48,7 +48,7 @@ impl McapSource { } } - #[allow(dead_code)] + #[cfg(test)] fn is_cloud_url(&self) -> bool { self.path.starts_with("s3://") || self.path.starts_with("oss://") } diff --git a/crates/roboflow-distributed/src/batch/controller.rs b/crates/roboflow-distributed/src/batch/controller.rs index 5737abd6..461c3949 100644 --- a/crates/roboflow-distributed/src/batch/controller.rs +++ b/crates/roboflow-distributed/src/batch/controller.rs @@ -382,20 +382,23 @@ impl BatchController { WorkUnitStatus::Failed | WorkUnitStatus::Dead => { failed += 1; // Collect error details from failed work units - if let Some(error) = &unit.error { - let source_file = unit - .files - .first() - .map(|f| f.url.clone()) - .unwrap_or_else(|| "unknown".to_string()); - failed_work_units.push(FailedWorkUnit { - id: unit.id.clone(), - source_file, - error: error.clone(), - retries: unit.attempts, - failed_at: unit.updated_at, - }); - } + let error = unit + .error + .as_ref() + .cloned() + .unwrap_or_else(|| "Unknown error".to_string()); + let source_file = unit + .files + .first() + .map(|f| f.url.clone()) + .unwrap_or_else(|| "unknown".to_string()); + failed_work_units.push(FailedWorkUnit { + id: unit.id.clone(), + source_file, + error, + retries: unit.attempts, + failed_at: unit.updated_at, + }); } WorkUnitStatus::Processing => processing += 1, _ => {} diff --git a/crates/roboflow-distributed/tests/zombie_reaper_test.rs b/crates/roboflow-distributed/tests/zombie_reaper_test.rs index 6a5b2750..9804ab77 100644 --- a/crates/roboflow-distributed/tests/zombie_reaper_test.rs +++ b/crates/roboflow-distributed/tests/zombie_reaper_test.rs @@ -210,9 +210,12 @@ mod tests { .await .expect("Failed to run reaper iteration"); - // Since we set stale_threshold to 0, the heartbeat should be stale + // Since we set stale_threshold to 5 seconds, the heartbeat should be stale // and the work unit should be reclaimed - assert!(reclaimed_count <= 1); + assert_eq!( + reclaimed_count, 1, + "Expected exactly one work unit to be reclaimed" + ); // Verify work unit was reclaimed (status should be Failed) let final_data = client From 6524a26c7667f2947db57af130f6453a98b500f1 Mon Sep 17 00:00:00 2001 From: Zhexuan Yang Date: Thu, 26 Feb 2026 14:11:43 +0800 Subject: [PATCH 4/4] fix: address remaining code review feedback from Kilo - Use full 40-char SHA for robocodec rev in Cargo.toml for reproducibility - Use unique UUID-based IDs in zombie_reaper_test.rs to avoid parallel CI interference - Add unconditional assertion before if let to prevent silent test skip --- Cargo.lock | 2 +- Cargo.toml | 2 +- .../tests/zombie_reaper_test.rs | 14 +++++++++----- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 389eae0f..cd3377fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3986,7 +3986,7 @@ dependencies = [ [[package]] name = "robocodec" version = "0.1.0" -source = "git+https://github.com/archebase/robocodec?rev=b27acf0#b27acf0e5881c38f8cc7f09e25c15787a440ae13" +source = "git+https://github.com/archebase/robocodec?rev=b27acf0e5881c38f8cc7f09e25c15787a440ae13#b27acf0e5881c38f8cc7f09e25c15787a440ae13" dependencies = [ "async-trait", "aws-config", diff --git a/Cargo.toml b/Cargo.toml index bd0cc797..ea79ef3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ roboflow-distributed = { path = "crates/roboflow-distributed", version = "0.2.0" # External dependencies # Pinned to specific commit for reproducible builds -robocodec = { git = "https://github.com/archebase/robocodec", rev = "b27acf0" } +robocodec = { git = "https://github.com/archebase/robocodec", rev = "b27acf0e5881c38f8cc7f09e25c15787a440ae13" } chrono = { version = "0.4", features = ["serde"] } async-trait = "0.1" tokio = { version = "1.40", features = ["rt-multi-thread", "sync"] } diff --git a/crates/roboflow-distributed/tests/zombie_reaper_test.rs b/crates/roboflow-distributed/tests/zombie_reaper_test.rs index 9804ab77..ca467a39 100644 --- a/crates/roboflow-distributed/tests/zombie_reaper_test.rs +++ b/crates/roboflow-distributed/tests/zombie_reaper_test.rs @@ -146,9 +146,9 @@ mod tests { } }; - let pod_id = "test-worker-zombie"; - let batch_id = "test-batch-zombie"; - let unit_id = "test-unit-zombie"; + let pod_id = format!("test-worker-zombie-{}", uuid::Uuid::new_v4()); + let batch_id = format!("test-batch-zombie-{}", uuid::Uuid::new_v4()); + let unit_id = format!("test-unit-zombie-{}", uuid::Uuid::new_v4()); // Create a work unit in Processing state let mut work_unit = WorkUnit::with_id( @@ -164,7 +164,7 @@ mod tests { work_unit .claim(pod_id.to_string()) .expect("Failed to claim work unit"); - let work_unit_key = WorkUnitKeys::unit(batch_id, unit_id); + let work_unit_key = WorkUnitKeys::unit(&batch_id, &unit_id); client .put( work_unit_key.clone(), @@ -177,7 +177,7 @@ mod tests { let mut heartbeat = HeartbeatRecord::new(pod_id.to_string()); use chrono::{Duration, Utc}; heartbeat.last_heartbeat = Utc::now() - Duration::seconds(10); // 10 seconds old - let heartbeat_key = HeartbeatKeys::heartbeat(pod_id); + let heartbeat_key = HeartbeatKeys::heartbeat(&pod_id); client .put( heartbeat_key.clone(), @@ -222,6 +222,10 @@ mod tests { .get(work_unit_key.clone()) .await .expect("Failed to get work unit"); + assert!( + final_data.is_some(), + "Work unit should exist after reclamation" + ); if let Some(data) = final_data { let final_unit: WorkUnit = bincode::deserialize(&data).expect("Failed to deserialize work unit");