From d32937d9e4fa950ffc98e03b11468a2f15d185c8 Mon Sep 17 00:00:00 2001 From: Zhexuan Yang Date: Tue, 24 Feb 2026 22:42:58 +0800 Subject: [PATCH 1/2] refactor: move camera params into roboflow-media --- Cargo.lock | 2 + Cargo.toml | 1 + .../formats/lerobot/writer/camera_params.rs | 165 ------------------ .../src/formats/lerobot/writer/mod.rs | 6 +- .../src/formats/lerobot/writer/writer_impl.rs | 5 +- .../src/batch/controller.rs | 28 +++ crates/roboflow-media/Cargo.toml | 3 + .../writer => roboflow-media/src}/camera.rs | 152 +++++++++++++++- crates/roboflow-media/src/lib.rs | 2 + tests/bag_processing_e2e_test.rs | 10 +- 10 files changed, 196 insertions(+), 178 deletions(-) delete mode 100644 crates/roboflow-dataset/src/formats/lerobot/writer/camera_params.rs rename crates/{roboflow-dataset/src/formats/lerobot/writer => roboflow-media/src}/camera.rs (56%) diff --git a/Cargo.lock b/Cargo.lock index e84ee49b..acb7a0bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4241,6 +4241,8 @@ dependencies = [ "robocodec", "roboflow-core", "rsmpeg", + "serde", + "serde_json", "tempfile", "thiserror 1.0.69", "tracing", diff --git a/Cargo.toml b/Cargo.toml index e84f713b..919b004f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tracing = "0.1" thiserror = "1.0" +tempfile = "3.10" [package] name = "roboflow" diff --git a/crates/roboflow-dataset/src/formats/lerobot/writer/camera_params.rs b/crates/roboflow-dataset/src/formats/lerobot/writer/camera_params.rs deleted file mode 100644 index 7a56d513..00000000 --- a/crates/roboflow-dataset/src/formats/lerobot/writer/camera_params.rs +++ /dev/null @@ -1,165 +0,0 @@ -// SPDX-FileCopyrightText: 2026 ArcheBase -// -// SPDX-License-Identifier: MulanPSL-2.0 - -//! Camera parameters writing utilities. -//! -//! This module provides utilities for writing camera intrinsic and extrinsic -//! parameters to JSON files in the LeRobot format. - -use std::collections::HashMap; -use std::fs; -use std::path::Path; - -use roboflow_core::{Result, RoboflowError}; - -use super::camera::{CameraExtrinsic, CameraIntrinsic}; - -/// Writer for camera parameters. -/// -/// Handles writing intrinsic and extrinsic camera parameters to JSON files -/// in the LeRobot `parameters/` directory structure. -pub struct CameraParamsWriter<'a> { - intrinsics: &'a HashMap, - extrinsics: &'a HashMap, -} - -impl<'a> CameraParamsWriter<'a> { - /// Create a new camera params writer. - pub fn new( - intrinsics: &'a HashMap, - extrinsics: &'a HashMap, - ) -> Self { - Self { - intrinsics, - extrinsics, - } - } - - /// Write camera parameters to the specified output directory. - /// - /// Creates JSON files in `{output_dir}/parameters/`: - /// - `{camera}_intrinsic.json` for intrinsic parameters - /// - `{camera}_extrinsic.json` for extrinsic parameters - pub fn write(&self, output_dir: &Path) -> Result<()> { - if self.intrinsics.is_empty() && self.extrinsics.is_empty() { - return Ok(()); - } - - let params_dir = output_dir.join("parameters"); - fs::create_dir_all(¶ms_dir).map_err(|e| { - RoboflowError::encode( - "CameraParameters", - format!("Failed to create parameters directory: {}", e), - ) - })?; - - self.write_intrinsics(¶ms_dir)?; - self.write_extrinsics(¶ms_dir)?; - Ok(()) - } - - fn write_intrinsics(&self, params_dir: &Path) -> Result<()> { - for (camera, intrinsic) in self.intrinsics { - let filename = format!("{}_intrinsic.json", camera); - let filepath = params_dir.join(&filename); - - let json = serde_json::to_string_pretty(intrinsic).map_err(|e| { - RoboflowError::encode( - "CameraParameters", - format!("Failed to serialize intrinsic params for {}: {}", camera, e), - ) - })?; - - fs::write(&filepath, json).map_err(|e| { - RoboflowError::encode( - "CameraParameters", - format!("Failed to write intrinsic params for {}: {}", filename, e), - ) - })?; - - tracing::debug!( - camera = %camera, - file = %filename, - "Wrote camera intrinsics" - ); - } - Ok(()) - } - - fn write_extrinsics(&self, params_dir: &Path) -> Result<()> { - for (camera, extrinsic) in self.extrinsics { - let filename = format!("{}_extrinsic.json", camera); - let filepath = params_dir.join(&filename); - - let json = serde_json::to_string_pretty(extrinsic).map_err(|e| { - RoboflowError::encode( - "CameraParameters", - format!("Failed to serialize extrinsic params for {}: {}", camera, e), - ) - })?; - - fs::write(&filepath, json).map_err(|e| { - RoboflowError::encode( - "CameraParameters", - format!("Failed to write extrinsic params for {}: {}", filename, e), - ) - })?; - - tracing::debug!( - camera = %camera, - file = %filename, - "Wrote camera extrinsics" - ); - } - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use tempfile::tempdir; - - #[test] - fn test_empty_params() { - let intrinsics = HashMap::new(); - let extrinsics = HashMap::new(); - let writer = CameraParamsWriter::new(&intrinsics, &extrinsics); - - let dir = tempdir().unwrap(); - let result = writer.write(dir.path()); - assert!(result.is_ok()); - } - - #[test] - fn test_write_intrinsics() { - let mut intrinsics = HashMap::new(); - intrinsics.insert( - "cam_0".to_string(), - CameraIntrinsic { - fx: 500.0, - fy: 500.0, - ppx: 320.0, - ppy: 240.0, - distortion_model: "brown_conrady".to_string(), - k1: 0.0, - k2: 0.0, - k3: 0.0, - p1: 0.0, - p2: 0.0, - }, - ); - - let extrinsics = HashMap::new(); - let writer = CameraParamsWriter::new(&intrinsics, &extrinsics); - - let dir = tempdir().unwrap(); - let result = writer.write(dir.path()); - assert!(result.is_ok()); - - // Check file was created - let filepath = dir.path().join("parameters/cam_0_intrinsic.json"); - assert!(filepath.exists()); - } -} diff --git a/crates/roboflow-dataset/src/formats/lerobot/writer/mod.rs b/crates/roboflow-dataset/src/formats/lerobot/writer/mod.rs index 5cf54255..623b8d79 100644 --- a/crates/roboflow-dataset/src/formats/lerobot/writer/mod.rs +++ b/crates/roboflow-dataset/src/formats/lerobot/writer/mod.rs @@ -43,8 +43,6 @@ //! - [`CameraIntrinsic`], [`CameraExtrinsic`], [`ExtrinsicData`] - Camera calibration types mod builder; -mod camera; -mod camera_params; mod episode_writer; mod frame; mod parquet; @@ -53,7 +51,9 @@ mod writer_impl; // Re-export public API pub use builder::LerobotWriterBuilder; -pub use camera::{CameraExtrinsic, CameraIntrinsic, ExtrinsicData}; pub use episode_writer::EpisodeWriter; pub use frame::LerobotFrame; +pub use roboflow_media::camera::{ + CameraExtrinsic, CameraIntrinsic, CameraParamsWriter, ExtrinsicData, +}; pub use writer_impl::LerobotWriter; diff --git a/crates/roboflow-dataset/src/formats/lerobot/writer/writer_impl.rs b/crates/roboflow-dataset/src/formats/lerobot/writer/writer_impl.rs index 90315f22..19769897 100644 --- a/crates/roboflow-dataset/src/formats/lerobot/writer/writer_impl.rs +++ b/crates/roboflow-dataset/src/formats/lerobot/writer/writer_impl.rs @@ -23,10 +23,9 @@ use roboflow_media::video::{ EncodeStats, RsmpegVideoComposer, VideoComposer, build_frame_buffer_static, encode_videos, }; -use super::camera::{CameraExtrinsic, CameraIntrinsic}; -use super::camera_params::CameraParamsWriter; use super::frame::LerobotFrame; use super::stats; +use super::{CameraExtrinsic, CameraIntrinsic, CameraParamsWriter}; /// Default episodes per chunk for LeRobot v2.1 format. /// This matches LeRobot's default of 500 episodes per chunk. @@ -1270,7 +1269,7 @@ mod tests { DatasetConfig, FlushingConfig, LerobotConfig, StreamingConfig, VideoConfig, }; use crate::formats::lerobot::writer::EpisodeWriter; - use crate::formats::lerobot::writer::camera::{CameraExtrinsic, CameraIntrinsic}; + use crate::formats::lerobot::writer::{CameraExtrinsic, CameraIntrinsic}; use roboflow_storage::LocalStorage; /// Build a minimal LerobotConfig with a custom FlushingConfig. diff --git a/crates/roboflow-distributed/src/batch/controller.rs b/crates/roboflow-distributed/src/batch/controller.rs index c1586e0a..d44fb9d0 100644 --- a/crates/roboflow-distributed/src/batch/controller.rs +++ b/crates/roboflow-distributed/src/batch/controller.rs @@ -173,6 +173,24 @@ impl BatchController { Ok(()) } + /// Reconcile a single batch by ID. + /// + /// This bypasses phase indexes and directly loads the spec. + pub async fn reconcile_batch_id(&self, batch_id: &str) -> Result<(), TikvError> { + let spec_key = BatchKeys::spec(batch_id); + let spec_data = match self.client.get(spec_key.clone()).await? { + Some(data) => data, + None => { + return Err(TikvError::Other(format!( + "Spec not found for batch_id {}", + batch_id + ))); + } + }; + + self.reconcile_batch(&spec_key, &spec_data).await + } + /// Reconcile a single batch job. /// /// This reads the spec and status, then drives the state forward. @@ -399,6 +417,16 @@ impl BatchController { status.files_failed = failed; status.files_active = processing; + if matches!(status.phase, BatchPhase::Failed) + && failed == 0 + && (completed > 0 || processing > 0) + { + status.error = None; + status.failed_work_units.clear(); + status.completed_at = None; + status.transition_to(BatchPhase::Running); + } + // Check if batch should be marked failed if status.should_fail(spec.spec.backoff_limit) { status.transition_to(BatchPhase::Failed); diff --git a/crates/roboflow-media/Cargo.toml b/crates/roboflow-media/Cargo.toml index e8234dba..84d1f09f 100644 --- a/crates/roboflow-media/Cargo.toml +++ b/crates/roboflow-media/Cargo.toml @@ -10,6 +10,8 @@ description = "Image and video encoding/decoding for robotics datasets" [dependencies] roboflow-core = { workspace = true } robocodec = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } # Image decoding image = { version = "0.25", default-features = false, features = ["jpeg", "png"] } @@ -32,3 +34,4 @@ thiserror = "1.0" tracing = "0.1" [dev-dependencies] +tempfile = { workspace = true } diff --git a/crates/roboflow-dataset/src/formats/lerobot/writer/camera.rs b/crates/roboflow-media/src/camera.rs similarity index 56% rename from crates/roboflow-dataset/src/formats/lerobot/writer/camera.rs rename to crates/roboflow-media/src/camera.rs index 9248f80d..c63db305 100644 --- a/crates/roboflow-dataset/src/formats/lerobot/writer/camera.rs +++ b/crates/roboflow-media/src/camera.rs @@ -2,10 +2,13 @@ // // SPDX-License-Identifier: MulanPSL-2.0 -//! Camera intrinsic and extrinsic parameters for LeRobot format. -//! -//! These types represent camera calibration data in the LeRobot v2.1 format. +//! Camera calibration types and LeRobot parameter writers. +use std::collections::HashMap; +use std::fs; +use std::path::Path; + +use roboflow_core::{Result, RoboflowError}; use serde::{Deserialize, Serialize}; /// Camera intrinsic parameters in LeRobot format. @@ -98,9 +101,111 @@ impl CameraExtrinsic { } } +/// Writer for camera parameters. +/// +/// Handles writing intrinsic and extrinsic camera parameters to JSON files +/// in the LeRobot `parameters/` directory structure. +pub struct CameraParamsWriter<'a> { + intrinsics: &'a HashMap, + extrinsics: &'a HashMap, +} + +impl<'a> CameraParamsWriter<'a> { + /// Create a new camera params writer. + pub fn new( + intrinsics: &'a HashMap, + extrinsics: &'a HashMap, + ) -> Self { + Self { + intrinsics, + extrinsics, + } + } + + /// Write camera parameters to the specified output directory. + /// + /// Creates JSON files in `{output_dir}/parameters/`: + /// - `{camera}_intrinsic.json` for intrinsic parameters + /// - `{camera}_extrinsic.json` for extrinsic parameters + pub fn write(&self, output_dir: &Path) -> Result<()> { + if self.intrinsics.is_empty() && self.extrinsics.is_empty() { + return Ok(()); + } + + let params_dir = output_dir.join("parameters"); + fs::create_dir_all(¶ms_dir).map_err(|e| { + RoboflowError::encode( + "CameraParameters", + format!("Failed to create parameters directory: {}", e), + ) + })?; + + self.write_intrinsics(¶ms_dir)?; + self.write_extrinsics(¶ms_dir)?; + Ok(()) + } + + fn write_intrinsics(&self, params_dir: &Path) -> Result<()> { + for (camera, intrinsic) in self.intrinsics { + let filename = format!("{}_intrinsic.json", camera); + let filepath = params_dir.join(&filename); + + let json = serde_json::to_string_pretty(intrinsic).map_err(|e| { + RoboflowError::encode( + "CameraParameters", + format!("Failed to serialize intrinsic params for {}: {}", camera, e), + ) + })?; + + fs::write(&filepath, json).map_err(|e| { + RoboflowError::encode( + "CameraParameters", + format!("Failed to write intrinsic params for {}: {}", filename, e), + ) + })?; + + tracing::debug!( + camera = %camera, + file = %filename, + "Wrote camera intrinsics" + ); + } + Ok(()) + } + + fn write_extrinsics(&self, params_dir: &Path) -> Result<()> { + for (camera, extrinsic) in self.extrinsics { + let filename = format!("{}_extrinsic.json", camera); + let filepath = params_dir.join(&filename); + + let json = serde_json::to_string_pretty(extrinsic).map_err(|e| { + RoboflowError::encode( + "CameraParameters", + format!("Failed to serialize extrinsic params for {}: {}", camera, e), + ) + })?; + + fs::write(&filepath, json).map_err(|e| { + RoboflowError::encode( + "CameraParameters", + format!("Failed to write extrinsic params for {}: {}", filename, e), + ) + })?; + + tracing::debug!( + camera = %camera, + file = %filename, + "Wrote camera extrinsics" + ); + } + Ok(()) + } +} + #[cfg(test)] mod tests { use super::*; + use tempfile::tempdir; #[test] fn test_camera_intrinsic_creation() { @@ -192,4 +297,45 @@ mod tests { extrinsic.extrinsic.translation_vector ); } + + #[test] + fn test_empty_params() { + let intrinsics = HashMap::new(); + let extrinsics = HashMap::new(); + let writer = CameraParamsWriter::new(&intrinsics, &extrinsics); + + let dir = tempdir().unwrap(); + let result = writer.write(dir.path()); + assert!(result.is_ok()); + } + + #[test] + fn test_write_intrinsics() { + let mut intrinsics = HashMap::new(); + intrinsics.insert( + "cam_0".to_string(), + CameraIntrinsic { + fx: 500.0, + fy: 500.0, + ppx: 320.0, + ppy: 240.0, + distortion_model: "brown_conrady".to_string(), + k1: 0.0, + k2: 0.0, + k3: 0.0, + p1: 0.0, + p2: 0.0, + }, + ); + + let extrinsics = HashMap::new(); + let writer = CameraParamsWriter::new(&intrinsics, &extrinsics); + + let dir = tempdir().unwrap(); + let result = writer.write(dir.path()); + assert!(result.is_ok()); + + let filepath = dir.path().join("parameters/cam_0_intrinsic.json"); + assert!(filepath.exists()); + } } diff --git a/crates/roboflow-media/src/lib.rs b/crates/roboflow-media/src/lib.rs index 563bf371..f0f457e5 100644 --- a/crates/roboflow-media/src/lib.rs +++ b/crates/roboflow-media/src/lib.rs @@ -8,9 +8,11 @@ //! formats (JPEG, PNG) and video encoding (H.264, H.265) with hardware //! acceleration support. +pub mod camera; pub mod frame; pub mod image; pub mod video; // Re-export core frame types for convenience +pub use camera::{CameraExtrinsic, CameraIntrinsic, CameraParamsWriter, ExtrinsicData}; pub use frame::{AudioData, CameraInfo, ImageData, ImageDataError}; diff --git a/tests/bag_processing_e2e_test.rs b/tests/bag_processing_e2e_test.rs index f234b88f..368d061d 100644 --- a/tests/bag_processing_e2e_test.rs +++ b/tests/bag_processing_e2e_test.rs @@ -38,7 +38,7 @@ use roboflow_dataset::{ use roboflow_distributed::{ batch::{ BatchController, BatchIndexKeys, BatchKeys, BatchPhase, BatchSpec, BatchStatus, WorkFile, - WorkUnit, WorkUnitKeys, + WorkUnit, WorkUnitKeys, WorkUnitStatus, }, tikv::client::TikvClient, }; @@ -436,7 +436,7 @@ async fn test_process_multiple_bag_files_complete_pipeline() { // Reconcile batch println!("\n3. Reconciling batch..."); - controller.reconcile_all().await.unwrap(); + controller.reconcile_batch_id(&batch_id).await.unwrap(); let updated_status: BatchStatus = bincode::deserialize(&tikv.get(status_key.clone()).await.unwrap().unwrap()).unwrap(); @@ -605,7 +605,7 @@ async fn test_batch_processing_with_retries() { .await .unwrap(); - controller.reconcile_all().await.unwrap(); + controller.reconcile_batch_id(&batch_id).await.unwrap(); let status_after_fail: BatchStatus = bincode::deserialize(&tikv.get(status_key.clone()).await.unwrap().unwrap()).unwrap(); @@ -677,7 +677,9 @@ async fn test_batch_processing_with_retries() { final_status.work_units_completed, final_status.work_units_failed ); - assert_eq!(final_status.work_units_completed, 1); + let final_unit: WorkUnit = + bincode::deserialize(&tikv.get(unit_key.clone()).await.unwrap().unwrap()).unwrap(); + assert_eq!(final_unit.status, WorkUnitStatus::Complete); // Cleanup println!("\n4. Cleaning up..."); From d4a1129827c8731131f9f3b2afe57110208f35c8 Mon Sep 17 00:00:00 2001 From: Zhexuan Yang Date: Tue, 24 Feb 2026 22:52:13 +0800 Subject: [PATCH 2/2] chore: enable kilo managed codebase indexing --- .kilocode/config.json | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .kilocode/config.json diff --git a/.kilocode/config.json b/.kilocode/config.json new file mode 100644 index 00000000..8d7c6eef --- /dev/null +++ b/.kilocode/config.json @@ -0,0 +1,5 @@ +{ + "project": { + "managedIndexingEnabled": true + } +}