diff --git a/docs/designs/production-units.md b/docs/designs/production-units.md new file mode 100644 index 0000000..f97752d --- /dev/null +++ b/docs/designs/production-units.md @@ -0,0 +1,198 @@ + + +# Production Units (Order / Batch / Task / Episode) Design (Edge) + +**Scope:** Order · Batch · Task · Episode lineage (Edge) + +## 1. Purpose and sources of truth + +This document defines the **public contract** and **core constraints** for production units on Keystone Edge. It is used to: + +- Guide integration and evolution across Keystone / Synapse / Axon +- Ensure the Edge lineage (Order→Batch→Task→Episode) stays consistent, auditable, and extensible + +--- + +## 2. Background, goals, and non-goals + +### 2.1 Background + +Production units turn a “production request” into “executable tasks”, and persist capture artifacts (MCAP + sidecar) as traceable Episodes to support downstream QA, sync, and search. + +### 2.2 Goals + +- **Complete lineage**: each Episode is traceable to a single Task, Batch, and Order, plus Scene/Subscene/SOP and workstation information. +- **Runnable Edge closed-loop**: Tasks can be generated and prepared (ready), recording/upload can be triggered, Episodes are persisted, and ACKs are confirmed. +- **Idempotent and recoverable**: upload/ACK retries do not create duplicate Episodes or incorrect counters. + +### 2.3 Non-goals + +- This document does not define cloud sync (Capstone), detailed QA rules, or UI interaction details. +- This document does not pin down “order completion policy” (e.g., backfilling failures, completion thresholds). It only constrains data and state validity. + +--- + +## 3. Domain relationships and invariants + +``` +Organization ── owns ──► Order ── has many ──► Batch ── has many ──► Task ── produces ──► Episode + │ │ + │ └── bound to one Workstation (batch dimension) + └── target_count, priority, scene_id, status ... +``` + +### 3.1 Core concepts + +- **Order**: a production request (target quantity, scene, priority, status). +- **Batch**: a production batch for an Order on a workstation (lineage dimension; carries episode count and batch status). +- **Task**: an atomic execution unit (binds SOP/Scene/Subscene/Workstation, drives recording and upload). +- **Episode**: a record of capture artifacts (mcap/json paths, QA fields, cloud processing fields, etc.). + +### 3.2 Core invariants (must hold long-term) + +- **Referential integrity**: an Episode must be able to resolve back to a Task; a Task must link to an Order and a Batch; a Task must resolve Scene/Subscene/SOP. +- **Idempotency**: a given `task_id` maps to at most one non-deleted Episode; repeated uploads/callbacks must not create duplicate Episodes. +- **Counter consistency**: `batches.episode_count` represents the number of persisted Episodes in that batch (+1 only when a new Episode is created). + +--- + +## 4. Data model (external semantics) + +This document does not restate full table schemas; it only defines key field semantics (see the migration file for details). + +- **Order** + - `target_count`: the maximum number of Tasks expected to be generated for the order (currently used to cap `POST /tasks`). + - `completed_count`: a derived statistic (based on the number of completed Tasks). +- **Batch** + - `batch_id`: a human-readable ID (for display/traceability). + - `episode_count`: the number of persisted Episodes (see 3.2). +- **Task** + - `task_id`: a human-readable ID (device/log-side primary identifier semantics). + - `status`: task state (see 6). + - Denormalized fields: `batch_name/scene_name/subscene_name/factory_id/organization_id/initial_scene_layout` for filtering and display. +- **Episode** + - `episode_id`: a human-readable ID (currently a UUID string in the implementation). + - `mcap_path/sidecar_path`: object storage paths (written by Transfer Verified ACK). + +--- + +## 5. HTTP API (production unit related) + +### 5.1 Orders (`/api/v1/orders`) + +| Method | Path | Notes | +|------|------|------| +| GET | `/orders` | List (non-deleted only) | +| POST | `/orders` | Create (default `status=created`) | +| GET | `/orders/:id` | Detail (includes `completed_count`) | +| PUT | `/orders/:id` | Update (`scene_id/name/target_count/priority/status/deadline/metadata`) | +| DELETE | `/orders/:id` | Soft delete (rejected if referenced by `batches/tasks/episodes`) | + +**Design constraints:** + +- **Deletion guard**: an Order referenced by the production lineage must not be deletable. +- **State transitions**: should converge to controlled transitions (current implementation allows any valid enum; the transition graph will be enforced later). + +--- + +### 5.2 Batches (`/api/v1/batches`) + +| Method | Path | Notes | +|------|------|------| +| GET | `/batches` | List (filters: `order_id/workstation_id/status/limit/offset`) | +| GET | `/batches/:id` | Detail | +| PATCH | `/batches/:id` | Status only; allows `pending -> active/cancelled`, `active -> completed/cancelled` | +| DELETE | `/batches/:id` | Soft delete (only allowed when `status=cancelled`) | + +**Design constraints:** + +- There is currently no `POST /batches`. Batches are created/reused implicitly during Task generation (see 5.3). + +--- + +### 5.3 Tasks (`/api/v1/tasks`) + +#### 5.3.1 Create (`POST /tasks`) + +`POST /tasks` is the entry point for creating Tasks per **(order + workstation)**: + +- **Request fields**: `order_id`, `sop_id`, `subscene_id`, `workstation_id`, optional `quantity` (default 1, range 1..1000) +- **Quantity constraint**: total Tasks under the same Order must not exceed `orders.target_count`. +- **Batch association (implicit)**: + - Prefer reusing a batch under the same `(order_id, workstation_id)` with status `pending/active`; + - Otherwise create a new `pending` batch. + +#### 5.3.2 Query and config + +| Method | Path | Notes | +|------|------|------| +| GET | `/tasks` | List (filters: `workstation_id/status/limit/offset`) | +| GET | `/tasks/:id` | Detail (includes `episode` if linked) | +| PUT | `/tasks/:id` | Status update (restricted transitions) | +| GET | `/tasks/:id/config` | Generate recorder config (requires workstation robot + collector bindings) | + +--- + +## 6. State machines (design constraints + current implementation) + +### 6.1 Task states + +- **State set**: `pending` | `ready` | `in_progress` | `completed` | `failed` | `cancelled` +- **Prepare (pending→ready)**: triggered by UI/scheduler (currently via `PUT /tasks/:id`). +- **Complete (→completed)**: set after Episode persistence via Transfer Verified ACK (current implementation). + +### 6.2 Batch states + +`pending` | `active` | `completed` | `cancelled` | `recalled` +Currently `PATCH /batches/:id` only supports limited transitions (see 5.2) to control the batch lifecycle. + +--- + +## 7. Key flows (end-to-end closed-loop) + +### 7.1 Callbacks (HTTP: `/api/v1/callbacks/*`) + +| Method | Path | Notes | +|------|------|------| +| POST | `/callbacks/start` | **ACK only** (no DB validation; does not update `tasks.status`) | +| POST | `/callbacks/finish` | If the device is online, send `upload_request` to the Transfer hub (does not write Episode directly) | + +**Design constraint:** callbacks are device-side event entrypoints, but **Task terminal state and Episode persistence must be idempotent and retryable**; implementation may treat Transfer ACK as the source of truth. + +### 7.2 Transfer (WebSocket: `GET /transfer/:device_id`, separate port) + +When the device reports `upload_complete`, Keystone runs the Verified ACK flow: + +1. **Verify objects exist in S3**: `///.mcap` and `.json` +2. **DB transaction**: + - Resolve `tasks.id` by `tasks.task_id` (numeric PK) + - **Idempotent**: if an Episode already exists for this `task_id`, do not insert again + - Insert into `episodes` (persist denormalized fields such as `batch_id/order_id/scene_id/...`) + - `batches.episode_count += 1` (only when a new Episode is inserted) + - Update `tasks.status` to **`completed`** (and set `completed_at`) +3. **Send `upload_ack`** to the device + +--- + +## 8. Known gaps and evolution + +- **In-recording state**: `callbacks/start` does not persist state today; `ready -> in_progress` validation/persistence is not implemented yet. +- **Failure path**: an end-to-end `failed` terminal state and error attribution are not fully implemented (callbacks/transfer need to be extended). +- **Controlled order transitions**: Order status updates currently only validate enum values; it should converge to controlled transitions aligned with Primer and linked to Task statistics. + +--- + +## 9. Code pointers (Keystone) + +| Area | Path | +|------|------| +| Task HTTP + callbacks | `internal/api/handlers/task.go` | +| Transfer + Episode writes | `internal/api/handlers/transfer.go` | +| Route mounting | `internal/server/server.go` | +| Table schemas | `internal/storage/database/migrations/000001_initial_schema.up.sql` | + +--- diff --git a/internal/api/handlers/batch.go b/internal/api/handlers/batch.go new file mode 100644 index 0000000..958eb95 --- /dev/null +++ b/internal/api/handlers/batch.go @@ -0,0 +1,472 @@ +// SPDX-FileCopyrightText: 2026 ArcheBase +// +// SPDX-License-Identifier: MulanPSL-2.0 + +package handlers + +import ( + "database/sql" + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "archebase.com/keystone-edge/internal/logger" + "github.com/gin-gonic/gin" + "github.com/jmoiron/sqlx" +) + +// BatchHandler handles batch-related HTTP requests. +type BatchHandler struct { + db *sqlx.DB +} + +// NewBatchHandler creates a new BatchHandler. +func NewBatchHandler(db *sqlx.DB) *BatchHandler { + return &BatchHandler{db: db} +} + +// RegisterRoutes registers batch routes under the provided router group. +func (h *BatchHandler) RegisterRoutes(apiV1 *gin.RouterGroup) { + apiV1.GET("/batches", h.ListBatches) + apiV1.GET("/batches/:id", h.GetBatch) + apiV1.DELETE("/batches/:id", h.DeleteBatch) + apiV1.PATCH("/batches/:id", h.PatchBatch) +} + +var validBatchStatuses = map[string]struct{}{ + "pending": {}, + "active": {}, + "completed": {}, + "cancelled": {}, + "recalled": {}, +} + +// BatchListItem represents a batch item in list responses. +type BatchListItem struct { + ID string `json:"id" db:"id"` + BatchID string `json:"batch_id" db:"batch_id"` + OrderID string `json:"order_id" db:"order_id"` + WorkstationID string `json:"workstation_id" db:"workstation_id"` + Name string `json:"name" db:"name"` + Notes string `json:"notes,omitempty" db:"notes"` + Status string `json:"status" db:"status"` + EpisodeCount int `json:"episode_count" db:"episode_count"` + StartedAt string `json:"started_at,omitempty"` + EndedAt string `json:"ended_at,omitempty"` + Metadata any `json:"metadata,omitempty"` + CreatedAt string `json:"created_at,omitempty"` + UpdatedAt string `json:"updated_at,omitempty"` +} + +// ListBatchesResponse represents the response body for listing batches. +type ListBatchesResponse struct { + Batches []BatchListItem `json:"batches"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` +} + +type batchRow struct { + ID int64 `db:"id"` + BatchID string `db:"batch_id"` + OrderID int64 `db:"order_id"` + WorkstationID int64 `db:"workstation_id"` + Name string `db:"name"` + Notes sql.NullString `db:"notes"` + Status string `db:"status"` + EpisodeCount int `db:"episode_count"` + StartedAt sql.NullTime `db:"started_at"` + EndedAt sql.NullTime `db:"ended_at"` + Metadata sql.NullString `db:"metadata"` + CreatedAt sql.NullString `db:"created_at"` + UpdatedAt sql.NullString `db:"updated_at"` +} + +func parseNullableJSON(v sql.NullString) any { + if !v.Valid { + return nil + } + raw := strings.TrimSpace(v.String) + if raw == "" || raw == "null" { + return nil + } + + var out any + if err := json.Unmarshal([]byte(raw), &out); err != nil { + return nil + } + return out +} + +func batchListItemFromRow(r batchRow) BatchListItem { + notes := "" + if r.Notes.Valid { + notes = r.Notes.String + } + + startedAt := "" + if r.StartedAt.Valid { + startedAt = r.StartedAt.Time.UTC().Format(time.RFC3339) + } + endedAt := "" + if r.EndedAt.Valid { + endedAt = r.EndedAt.Time.UTC().Format(time.RFC3339) + } + + createdAt := "" + if r.CreatedAt.Valid { + createdAt = r.CreatedAt.String + } + updatedAt := "" + if r.UpdatedAt.Valid { + updatedAt = r.UpdatedAt.String + } + + return BatchListItem{ + ID: fmt.Sprintf("%d", r.ID), + BatchID: r.BatchID, + OrderID: fmt.Sprintf("%d", r.OrderID), + WorkstationID: fmt.Sprintf("%d", r.WorkstationID), + Name: r.Name, + Notes: notes, + Status: r.Status, + EpisodeCount: r.EpisodeCount, + StartedAt: startedAt, + EndedAt: endedAt, + Metadata: parseNullableJSON(r.Metadata), + CreatedAt: createdAt, + UpdatedAt: updatedAt, + } +} + +// ListBatches handles batch listing requests with optional filtering. +// +// @Summary List batches +// @Description Lists batches with optional order/workstation/status filters +// @Tags batches +// @Produce json +// @Param order_id query string false "Filter by order ID" +// @Param workstation_id query string false "Filter by workstation ID" +// @Param status query string false "Filter by status" +// @Param limit query int false "Max results" default(50) +// @Param offset query int false "Pagination offset" default(0) +// @Success 200 {object} ListBatchesResponse +// @Failure 400 {object} map[string]string +// @Failure 500 {object} map[string]string +// @Router /batches [get] +func (h *BatchHandler) ListBatches(c *gin.Context) { + const defaultLimit = 50 + + orderID := strings.TrimSpace(c.Query("order_id")) + workstationID := strings.TrimSpace(c.Query("workstation_id")) + status := strings.TrimSpace(c.Query("status")) + + limit := defaultLimit + if rawLimit := strings.TrimSpace(c.Query("limit")); rawLimit != "" { + parsedLimit, err := strconv.Atoi(rawLimit) + if err != nil || parsedLimit <= 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "limit must be a positive integer"}) + return + } + limit = parsedLimit + } + + offset := 0 + if rawOffset := strings.TrimSpace(c.Query("offset")); rawOffset != "" { + parsedOffset, err := strconv.Atoi(rawOffset) + if err != nil || parsedOffset < 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "offset must be a non-negative integer"}) + return + } + offset = parsedOffset + } + + if status != "" { + if _, ok := validBatchStatuses[status]; !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid status"}) + return + } + } + + conditions := []string{"deleted_at IS NULL"} + args := make([]interface{}, 0, 6) + + if orderID != "" { + conditions = append(conditions, "CAST(order_id AS CHAR) = ?") + args = append(args, orderID) + } + if workstationID != "" { + conditions = append(conditions, "CAST(workstation_id AS CHAR) = ?") + args = append(args, workstationID) + } + if status != "" { + conditions = append(conditions, "status = ?") + args = append(args, status) + } + + whereClause := strings.Join(conditions, " AND ") + + var total int + countQuery := fmt.Sprintf("SELECT COUNT(*) FROM batches WHERE %s", whereClause) + if err := h.db.Get(&total, countQuery, args...); err != nil { + logger.Printf("[BATCH] Failed to count batches: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list batches"}) + return + } + + query := fmt.Sprintf(` + SELECT + id, + batch_id, + order_id, + workstation_id, + name, + notes, + status, + episode_count, + started_at, + ended_at, + CAST(metadata AS CHAR) AS metadata, + created_at, + updated_at + FROM batches + WHERE %s + ORDER BY id DESC + LIMIT ? OFFSET ? + `, whereClause) + + queryArgs := append(append([]interface{}{}, args...), limit, offset) + + var rows []batchRow + if err := h.db.Select(&rows, query, queryArgs...); err != nil { + logger.Printf("[BATCH] Failed to query batches: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list batches"}) + return + } + + batches := make([]BatchListItem, 0, len(rows)) + for _, r := range rows { + batches = append(batches, batchListItemFromRow(r)) + } + + c.JSON(http.StatusOK, ListBatchesResponse{ + Batches: batches, + Total: total, + Limit: limit, + Offset: offset, + }) +} + +// GetBatch returns a single batch by its numeric ID. +// +// @Summary Get batch +// @Description Returns a single batch by id +// @Tags batches +// @Produce json +// @Param id path int true "Batch ID" +// @Success 200 {object} BatchListItem +// @Failure 400 {object} map[string]string +// @Failure 404 {object} map[string]string +// @Failure 500 {object} map[string]string +// @Router /batches/{id} [get] +func (h *BatchHandler) GetBatch(c *gin.Context) { + idStr := c.Param("id") + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid batch id"}) + return + } + + query := ` + SELECT + id, + batch_id, + order_id, + workstation_id, + name, + notes, + status, + episode_count, + started_at, + ended_at, + CAST(metadata AS CHAR) AS metadata, + created_at, + updated_at + FROM batches + WHERE id = ? AND deleted_at IS NULL + ` + + var r batchRow + if err := h.db.Get(&r, query, id); err != nil { + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "batch not found"}) + return + } + logger.Printf("[BATCH] Failed to query batch: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get batch"}) + return + } + + c.JSON(http.StatusOK, batchListItemFromRow(r)) +} + +// DeleteBatch handles batch deletion requests (soft delete). +// Only batches with status "cancelled" can be deleted. +// +// @Summary Delete batch +// @Description Soft deletes a batch by ID. Only allowed when status is cancelled. +// @Tags batches +// @Accept json +// @Produce json +// @Param id path int true "Batch ID" +// @Success 204 +// @Failure 400 {object} map[string]string +// @Failure 404 {object} map[string]string +// @Failure 500 {object} map[string]string +// @Router /batches/{id} [delete] +func (h *BatchHandler) DeleteBatch(c *gin.Context) { + idStr := c.Param("id") + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil || id <= 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid batch id"}) + return + } + + var status string + if err := h.db.Get(&status, "SELECT status FROM batches WHERE id = ? AND deleted_at IS NULL", id); err != nil { + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "batch not found"}) + return + } + logger.Printf("[BATCH] Failed to query batch status: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete batch"}) + return + } + + if status != "cancelled" { + c.JSON(http.StatusBadRequest, gin.H{"error": "batch can only be deleted when status is cancelled"}) + return + } + + now := time.Now().UTC() + if _, err := h.db.Exec("UPDATE batches SET deleted_at = ?, updated_at = ? WHERE id = ? AND deleted_at IS NULL", now, now, id); err != nil { + logger.Printf("[BATCH] Failed to delete batch: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete batch"}) + return + } + + c.Status(http.StatusNoContent) +} + +// PatchBatchRequest is the request body for patching a batch. +type PatchBatchRequest struct { + Status string `json:"status"` +} + +// PatchBatchResponse is the response body for patching a batch. +type PatchBatchResponse struct { + ID string `json:"id"` + Status string `json:"status"` + StartedAt string `json:"started_at,omitempty"` + UpdatedAt string `json:"updated_at,omitempty"` +} + +// PatchBatch handles batch status updates. Only supports status transitions: +// - pending -> active | cancelled +// - active -> completed | cancelled +// +// @Summary Patch batch +// @Description Updates batch status. Only specific state transitions are allowed. +// @Tags batches +// @Accept json +// @Produce json +// @Param id path int true "Batch ID" +// @Param body body PatchBatchRequest true "Patch batch status" +// @Success 200 {object} PatchBatchResponse +// @Failure 400 {object} map[string]string +// @Failure 404 {object} map[string]string +// @Failure 500 {object} map[string]string +// @Router /batches/{id} [patch] +func (h *BatchHandler) PatchBatch(c *gin.Context) { + idStr := c.Param("id") + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil || id <= 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid batch id"}) + return + } + + var req PatchBatchRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) + return + } + req.Status = strings.TrimSpace(req.Status) + if req.Status == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "status is required"}) + return + } + + type statusRow struct { + Status string `db:"status"` + StartedAt sql.NullTime `db:"started_at"` + UpdatedAt sql.NullTime `db:"updated_at"` + } + var cur statusRow + if err := h.db.Get(&cur, "SELECT status, started_at, updated_at FROM batches WHERE id = ? AND deleted_at IS NULL", id); err != nil { + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "batch not found"}) + return + } + logger.Printf("[BATCH] Failed to query batch: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to patch batch"}) + return + } + + allowed := false + switch cur.Status { + case "pending": + allowed = req.Status == "active" || req.Status == "cancelled" + case "active": + allowed = req.Status == "completed" || req.Status == "cancelled" + default: + allowed = false + } + if !allowed { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid status transition"}) + return + } + + now := time.Now().UTC() + startedAt := cur.StartedAt + updates := []string{"status = ?", "updated_at = ?"} + args := []interface{}{req.Status, now} + + // pending -> active sets started_at when not set yet + if cur.Status == "pending" && req.Status == "active" && !startedAt.Valid { + startedAt = sql.NullTime{Time: now, Valid: true} + updates = append(updates, "started_at = ?") + args = append(args, now) + } + + args = append(args, id) + query := fmt.Sprintf("UPDATE batches SET %s WHERE id = ? AND deleted_at IS NULL", strings.Join(updates, ", ")) + if _, err := h.db.Exec(query, args...); err != nil { + logger.Printf("[BATCH] Failed to patch batch: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to patch batch"}) + return + } + + startedAtOut := "" + if startedAt.Valid { + startedAtOut = startedAt.Time.UTC().Format(time.RFC3339) + } + c.JSON(http.StatusOK, PatchBatchResponse{ + ID: fmt.Sprintf("%d", id), + Status: req.Status, + StartedAt: startedAtOut, + UpdatedAt: now.Format(time.RFC3339), + }) +} diff --git a/internal/api/handlers/episode.go b/internal/api/handlers/episode.go index b6b6964..ca8e626 100644 --- a/internal/api/handlers/episode.go +++ b/internal/api/handlers/episode.go @@ -32,7 +32,7 @@ func NewEpisodeHandler(db *sqlx.DB) *EpisodeHandler { // Episode represents an episode in the database type episodeRow struct { ID string `db:"episode_id"` - TaskID string `db:"task_id"` + TaskID int64 `db:"task_id"` McapPath string `db:"mcap_path"` SidecarPath string `db:"sidecar_path"` Checksum sql.NullString `db:"checksum"` @@ -50,7 +50,7 @@ type episodeRow struct { // Episode represents an episode in the API response type Episode struct { ID string `json:"id"` - TaskID string `json:"task_id"` + TaskID int64 `json:"task_id"` McapPath string `json:"mcap_path"` SidecarPath string `json:"sidecar_path"` Checksum *string `json:"checksum"` @@ -112,7 +112,7 @@ func nullableTime(value sql.NullTime) *string { // @Description Returns a list of episodes with optional filtering by task_id, qa_status, auto_approved, and cloud_processed // @Tags episodes // @Produce json -// @Param task_id query string false "Filter by task ID" +// @Param task_id query string false "Filter by task numeric id (or legacy public task_id string)" // @Param qa_status query string false "Filter by QA status" // @Param auto_approved query bool false "Filter by auto-approval status" // @Param cloud_processed query bool false "Filter by cloud processing status" @@ -145,7 +145,7 @@ func (h *EpisodeHandler) ListEpisodes(c *gin.Context) { query := ` SELECT e.episode_id, - COALESCE(t.task_id, '') as task_id, + e.task_id as task_id, e.mcap_path, e.sidecar_path, COALESCE(e.qa_status, '') as qa_status, @@ -154,14 +154,12 @@ func (h *EpisodeHandler) ListEpisodes(c *gin.Context) { e.cloud_processed, e.created_at FROM episodes e - LEFT JOIN tasks t ON e.task_id = t.id WHERE e.deleted_at IS NULL ` countQuery := ` SELECT COUNT(1) FROM episodes e - LEFT JOIN tasks t ON e.task_id = t.id WHERE e.deleted_at IS NULL ` @@ -170,10 +168,19 @@ func (h *EpisodeHandler) ListEpisodes(c *gin.Context) { // Add filters if taskID != "" { - query += " AND t.task_id = ?" - countQuery += " AND t.task_id = ?" - args = append(args, taskID) - argsCount = append(argsCount, taskID) + // Prefer numeric task primary key (tasks.id / episodes.task_id). + // For backwards compatibility, also accept legacy public task_id (tasks.task_id). + if parsed, err := strconv.ParseInt(taskID, 10, 64); err == nil { + query += " AND e.task_id = ?" + countQuery += " AND e.task_id = ?" + args = append(args, parsed) + argsCount = append(argsCount, parsed) + } else { + query += " AND EXISTS (SELECT 1 FROM tasks t WHERE t.id = e.task_id AND t.task_id = ? AND t.deleted_at IS NULL)" + countQuery += " AND EXISTS (SELECT 1 FROM tasks t WHERE t.id = e.task_id AND t.task_id = ? AND t.deleted_at IS NULL)" + args = append(args, taskID) + argsCount = append(argsCount, taskID) + } } if qaStatus != "" { @@ -273,7 +280,7 @@ func (h *EpisodeHandler) GetEpisode(c *gin.Context) { query := ` SELECT e.episode_id, - COALESCE(t.task_id, '') AS task_id, + e.task_id AS task_id, e.mcap_path, e.sidecar_path, e.checksum, @@ -287,7 +294,6 @@ func (h *EpisodeHandler) GetEpisode(c *gin.Context) { e.cloud_synced_at, e.created_at FROM episodes e - LEFT JOIN tasks t ON e.task_id = t.id LEFT JOIN inspections i ON i.episode_id = e.id LEFT JOIN inspectors ins ON ins.id = i.inspector_id WHERE e.episode_id = ? AND e.deleted_at IS NULL diff --git a/internal/api/handlers/order.go b/internal/api/handlers/order.go new file mode 100644 index 0000000..ee173d9 --- /dev/null +++ b/internal/api/handlers/order.go @@ -0,0 +1,617 @@ +// SPDX-FileCopyrightText: 2026 ArcheBase +// +// SPDX-License-Identifier: MulanPSL-2.0 + +package handlers + +import ( + "database/sql" + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "archebase.com/keystone-edge/internal/logger" + "github.com/gin-gonic/gin" + "github.com/jmoiron/sqlx" +) + +// OrderHandler handles order-related HTTP requests. +type OrderHandler struct { + db *sqlx.DB +} + +// NewOrderHandler creates a new OrderHandler. +func NewOrderHandler(db *sqlx.DB) *OrderHandler { + return &OrderHandler{db: db} +} + +// RegisterRoutes registers order routes under the provided router group. +func (h *OrderHandler) RegisterRoutes(apiV1 *gin.RouterGroup) { + apiV1.GET("/orders", h.ListOrders) + apiV1.POST("/orders", h.CreateOrder) + apiV1.GET("/orders/:id", h.GetOrder) + apiV1.PUT("/orders/:id", h.UpdateOrder) + apiV1.DELETE("/orders/:id", h.DeleteOrder) +} + +// OrderListResponse is the response body for listing orders. +type OrderListResponse struct { + Orders []OrderResponse `json:"orders"` +} + +// OrderResponse is the response body for a single order. +type OrderResponse struct { + ID string `json:"id"` + SceneID string `json:"scene_id"` + Name string `json:"name"` + TargetCount int `json:"target_count"` + CompletedCount int `json:"completed_count"` + Status string `json:"status"` + Priority string `json:"priority"` + Deadline string `json:"deadline,omitempty"` + Metadata any `json:"metadata,omitempty"` + CreatedAt string `json:"created_at,omitempty"` + UpdatedAt string `json:"updated_at,omitempty"` +} + +// CreateOrderRequest is the request body for creating an order. +type CreateOrderRequest struct { + SceneID string `json:"scene_id"` + Name string `json:"name"` + TargetCount int `json:"target_count"` + Priority string `json:"priority"` + Deadline *string `json:"deadline,omitempty"` // RFC3339 + Metadata json.RawMessage `json:"metadata,omitempty"` // JSON object +} + +// UpdateOrderRequest is the request body for partially updating an order. +type UpdateOrderRequest struct { + SceneID *string `json:"scene_id,omitempty"` + Name *string `json:"name,omitempty"` + TargetCount *int `json:"target_count,omitempty"` + Priority *string `json:"priority,omitempty"` + Status *string `json:"status,omitempty"` + Deadline *string `json:"deadline,omitempty"` // RFC3339 or empty to clear + // Metadata uses optionalJSONPatch so JSON null is distinct from omitting the key. + // For orders, explicit null is normalized to "{}" (empty object). + Metadata optionalJSONPatch `json:"metadata,omitempty"` +} + +type orderRow struct { + ID int64 `db:"id"` + SceneID int64 `db:"scene_id"` + Name string `db:"name"` + TargetCount int `db:"target_count"` + CompletedCount int `db:"completed_count"` + Status string `db:"status"` + Priority string `db:"priority"` + Deadline sql.NullTime `db:"deadline"` + Metadata sql.NullString `db:"metadata"` + CreatedAt sql.NullString `db:"created_at"` + UpdatedAt sql.NullString `db:"updated_at"` +} + +var validOrderPriorities = map[string]struct{}{ + "low": {}, + "normal": {}, + "high": {}, + "urgent": {}, +} + +var validOrderStatuses = map[string]struct{}{ + "created": {}, + "in_progress": {}, + "paused": {}, + "completed": {}, + "cancelled": {}, +} + +// ListOrders returns all non-deleted orders. +func (h *OrderHandler) ListOrders(c *gin.Context) { + query := ` + SELECT + o.id, + o.scene_id, + o.name, + o.target_count, + o.status, + o.priority, + o.deadline, + CAST(o.metadata AS CHAR) AS metadata, + o.created_at, + o.updated_at, + (SELECT COUNT(*) FROM tasks t WHERE t.order_id = o.id AND t.status = 'completed' AND t.deleted_at IS NULL) AS completed_count + FROM orders o + WHERE o.deleted_at IS NULL + ORDER BY o.id DESC + ` + + var rows []orderRow + if err := h.db.Select(&rows, query); err != nil { + logger.Printf("[ORDER] Failed to query orders: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list orders"}) + return + } + + orders := make([]OrderResponse, 0, len(rows)) + for _, r := range rows { + createdAt := "" + if r.CreatedAt.Valid { + createdAt = r.CreatedAt.String + } + updatedAt := "" + if r.UpdatedAt.Valid { + updatedAt = r.UpdatedAt.String + } + deadline := "" + if r.Deadline.Valid { + deadline = r.Deadline.Time.UTC().Format(time.RFC3339) + } + var metadata any + if r.Metadata.Valid && strings.TrimSpace(r.Metadata.String) != "" && strings.TrimSpace(r.Metadata.String) != "null" { + var v any + if err := json.Unmarshal([]byte(r.Metadata.String), &v); err == nil { + metadata = v + } + } + orders = append(orders, OrderResponse{ + ID: fmt.Sprintf("%d", r.ID), + SceneID: fmt.Sprintf("%d", r.SceneID), + Name: r.Name, + TargetCount: r.TargetCount, + CompletedCount: r.CompletedCount, + Status: r.Status, + Priority: r.Priority, + Deadline: deadline, + Metadata: metadata, + CreatedAt: createdAt, + UpdatedAt: updatedAt, + }) + } + + c.JSON(http.StatusOK, OrderListResponse{Orders: orders}) +} + +// GetOrder returns a single order by id. +func (h *OrderHandler) GetOrder(c *gin.Context) { + idStr := c.Param("id") + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid order id"}) + return + } + + query := ` + SELECT + o.id, + o.scene_id, + o.name, + o.target_count, + o.status, + o.priority, + o.deadline, + CAST(o.metadata AS CHAR) AS metadata, + o.created_at, + o.updated_at, + (SELECT COUNT(*) FROM tasks t WHERE t.order_id = o.id AND t.status = 'completed' AND t.deleted_at IS NULL) AS completed_count + FROM orders o + WHERE o.id = ? AND o.deleted_at IS NULL + ` + + var r orderRow + if err := h.db.Get(&r, query, id); err != nil { + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error": "order not found"}) + return + } + logger.Printf("[ORDER] Failed to query order: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to get order"}) + return + } + + createdAt := "" + if r.CreatedAt.Valid { + createdAt = r.CreatedAt.String + } + updatedAt := "" + if r.UpdatedAt.Valid { + updatedAt = r.UpdatedAt.String + } + deadline := "" + if r.Deadline.Valid { + deadline = r.Deadline.Time.UTC().Format(time.RFC3339) + } + var metadata any + if r.Metadata.Valid && strings.TrimSpace(r.Metadata.String) != "" && strings.TrimSpace(r.Metadata.String) != "null" { + var v any + if err := json.Unmarshal([]byte(r.Metadata.String), &v); err == nil { + metadata = v + } + } + + c.JSON(http.StatusOK, OrderResponse{ + ID: fmt.Sprintf("%d", r.ID), + SceneID: fmt.Sprintf("%d", r.SceneID), + Name: r.Name, + TargetCount: r.TargetCount, + CompletedCount: r.CompletedCount, + Status: r.Status, + Priority: r.Priority, + Deadline: deadline, + Metadata: metadata, + CreatedAt: createdAt, + UpdatedAt: updatedAt, + }) +} + +// CreateOrder creates a new order. +func (h *OrderHandler) CreateOrder(c *gin.Context) { + var req CreateOrderRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) + return + } + + req.SceneID = strings.TrimSpace(req.SceneID) + req.Name = strings.TrimSpace(req.Name) + req.Priority = strings.TrimSpace(req.Priority) + + if req.SceneID == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "scene_id is required"}) + return + } + if req.Name == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "name is required"}) + return + } + if req.TargetCount <= 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "target_count must be > 0"}) + return + } + if req.Priority == "" { + req.Priority = "normal" + } + if _, ok := validOrderPriorities[req.Priority]; !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid priority"}) + return + } + + var deadline sql.NullTime + if req.Deadline != nil { + dl := strings.TrimSpace(*req.Deadline) + if dl != "" { + tm, err := time.Parse(time.RFC3339, dl) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid deadline format (RFC3339)"}) + return + } + deadline = sql.NullTime{Time: tm.UTC(), Valid: true} + } + } + + var metadataStr sql.NullString + if len(req.Metadata) > 0 { + raw := strings.TrimSpace(string(req.Metadata)) + if raw == "" || raw == "null" { + metadataStr = sql.NullString{Valid: false} + } else { + var tmp any + if err := json.Unmarshal(req.Metadata, &tmp); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid metadata (must be valid JSON)"}) + return + } + metadataStr = sql.NullString{String: raw, Valid: true} + } + } + + sceneID, err := strconv.ParseInt(req.SceneID, 10, 64) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid scene_id format"}) + return + } + + // Derive organization_id from scene -> factory -> organization + var organizationID int64 + err = h.db.Get(&organizationID, ` + SELECT f.organization_id + FROM scenes s + JOIN factories f ON f.id = s.factory_id + WHERE s.id = ? AND s.deleted_at IS NULL AND f.deleted_at IS NULL + `, sceneID) + if err != nil { + if err == sql.ErrNoRows { + c.JSON(http.StatusBadRequest, gin.H{"error": "scene not found"}) + return + } + logger.Printf("[ORDER] Failed to derive organization_id: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create order"}) + return + } + + now := time.Now().UTC() + + result, err := h.db.Exec( + `INSERT INTO orders ( + organization_id, + scene_id, + name, + target_count, + priority, + deadline, + metadata, + status, + created_at, + updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, 'created', ?, ?)`, + organizationID, + sceneID, + req.Name, + req.TargetCount, + req.Priority, + deadline, + metadataStr, + now, + now, + ) + if err != nil { + logger.Printf("[ORDER] Failed to insert order: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create order"}) + return + } + + id, err := result.LastInsertId() + if err != nil { + logger.Printf("[ORDER] Failed to fetch inserted id: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to create order"}) + return + } + + var metadata any + if metadataStr.Valid && strings.TrimSpace(metadataStr.String) != "" && strings.TrimSpace(metadataStr.String) != "null" { + var v any + if err := json.Unmarshal([]byte(metadataStr.String), &v); err == nil { + metadata = v + } + } + deadlineOut := "" + if deadline.Valid { + deadlineOut = deadline.Time.UTC().Format(time.RFC3339) + } + + c.JSON(http.StatusCreated, OrderResponse{ + ID: fmt.Sprintf("%d", id), + SceneID: fmt.Sprintf("%d", sceneID), + Name: req.Name, + TargetCount: req.TargetCount, + CompletedCount: 0, + Status: "created", + Priority: req.Priority, + Deadline: deadlineOut, + Metadata: metadata, + CreatedAt: now.Format(time.RFC3339), + UpdatedAt: now.Format(time.RFC3339), + }) +} + +// UpdateOrder partially updates mutable order fields. +func (h *OrderHandler) UpdateOrder(c *gin.Context) { + idStr := c.Param("id") + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid order id"}) + return + } + + var req UpdateOrderRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid request body"}) + return + } + + // Ensure exists + var exists bool + if err := h.db.Get(&exists, "SELECT EXISTS(SELECT 1 FROM orders WHERE id = ? AND deleted_at IS NULL)", id); err != nil { + logger.Printf("[ORDER] Failed to check order existence: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update order"}) + return + } + if !exists { + c.JSON(http.StatusNotFound, gin.H{"error": "order not found"}) + return + } + + updates := []string{} + args := []interface{}{} + + if req.SceneID != nil { + sceneIDStr := strings.TrimSpace(*req.SceneID) + if sceneIDStr == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "scene_id cannot be empty"}) + return + } + sceneID, err := strconv.ParseInt(sceneIDStr, 10, 64) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid scene_id format"}) + return + } + var sceneExists bool + if err := h.db.Get(&sceneExists, "SELECT EXISTS(SELECT 1 FROM scenes WHERE id = ? AND deleted_at IS NULL)", sceneID); err != nil { + logger.Printf("[ORDER] Failed to verify scene: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update order"}) + return + } + if !sceneExists { + c.JSON(http.StatusBadRequest, gin.H{"error": "scene not found"}) + return + } + updates = append(updates, "scene_id = ?") + args = append(args, sceneID) + } + + if req.TargetCount != nil { + if *req.TargetCount <= 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "target_count must be > 0"}) + return + } + updates = append(updates, "target_count = ?") + args = append(args, *req.TargetCount) + } + + if req.Name != nil { + name := strings.TrimSpace(*req.Name) + if name == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "name cannot be empty"}) + return + } + updates = append(updates, "name = ?") + args = append(args, name) + } + + if req.Deadline != nil { + dl := strings.TrimSpace(*req.Deadline) + if dl == "" { + updates = append(updates, "deadline = NULL") + } else { + tm, err := time.Parse(time.RFC3339, dl) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid deadline format (RFC3339)"}) + return + } + updates = append(updates, "deadline = ?") + args = append(args, tm.UTC()) + } + } + + if req.Metadata.Present { + if req.Metadata.Value == nil { + updates = append(updates, "metadata = ?") + args = append(args, "{}") + } else { + // Keep order metadata as a JSON object for consistency. + if _, ok := req.Metadata.Value.(map[string]interface{}); !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid metadata (must be a JSON object)"}) + return + } + b, err := json.Marshal(req.Metadata.Value) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid metadata (must be valid JSON)"}) + return + } + updates = append(updates, "metadata = ?") + args = append(args, strings.TrimSpace(string(b))) + } + } + + if req.Priority != nil { + priority := strings.TrimSpace(*req.Priority) + if priority == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "priority cannot be empty"}) + return + } + if _, ok := validOrderPriorities[priority]; !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid priority"}) + return + } + updates = append(updates, "priority = ?") + args = append(args, priority) + } + + if req.Status != nil { + status := strings.TrimSpace(*req.Status) + if status == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "status cannot be empty"}) + return + } + if _, ok := validOrderStatuses[status]; !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid status"}) + return + } + updates = append(updates, "status = ?") + args = append(args, status) + } + + if len(updates) == 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "no fields to update"}) + return + } + + now := time.Now().UTC() + updates = append(updates, "updated_at = ?") + args = append(args, now, id) + + query := fmt.Sprintf("UPDATE orders SET %s WHERE id = ? AND deleted_at IS NULL", strings.Join(updates, ", ")) + if _, err := h.db.Exec(query, args...); err != nil { + logger.Printf("[ORDER] Failed to update order: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update order"}) + return + } + + h.GetOrder(c) +} + +// DeleteOrder soft-deletes an order if it is not referenced by other production units. +func (h *OrderHandler) DeleteOrder(c *gin.Context) { + idStr := c.Param("id") + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid order id"}) + return + } + + var exists bool + if err := h.db.Get(&exists, "SELECT EXISTS(SELECT 1 FROM orders WHERE id = ? AND deleted_at IS NULL)", id); err != nil { + logger.Printf("[ORDER] Failed to check order existence: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete order"}) + return + } + if !exists { + c.JSON(http.StatusNotFound, gin.H{"error": "order not found"}) + return + } + + // Reject deletion when the order has related batches/tasks/episodes. + var batchCount int + if err := h.db.Get(&batchCount, "SELECT COUNT(*) FROM batches WHERE order_id = ? AND deleted_at IS NULL", id); err != nil { + logger.Printf("[ORDER] Failed to check batch references: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete order"}) + return + } + if batchCount > 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("cannot delete order referenced by %d batches", batchCount)}) + return + } + + var taskCount int + if err := h.db.Get(&taskCount, "SELECT COUNT(*) FROM tasks WHERE order_id = ? AND deleted_at IS NULL", id); err != nil { + logger.Printf("[ORDER] Failed to check task references: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete order"}) + return + } + if taskCount > 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("cannot delete order referenced by %d tasks", taskCount)}) + return + } + + var episodeCount int + if err := h.db.Get(&episodeCount, "SELECT COUNT(*) FROM episodes WHERE order_id = ? AND deleted_at IS NULL", id); err != nil { + logger.Printf("[ORDER] Failed to check episode references: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete order"}) + return + } + if episodeCount > 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("cannot delete order referenced by %d episodes", episodeCount)}) + return + } + + now := time.Now().UTC() + if _, err := h.db.Exec("UPDATE orders SET deleted_at = ?, updated_at = ? WHERE id = ? AND deleted_at IS NULL", now, now, id); err != nil { + logger.Printf("[ORDER] Failed to delete order: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete order"}) + return + } + + c.Status(http.StatusNoContent) +} diff --git a/internal/api/handlers/scene.go b/internal/api/handlers/scene.go index 60b4c62..8556409 100644 --- a/internal/api/handlers/scene.go +++ b/internal/api/handlers/scene.go @@ -544,6 +544,19 @@ func (h *SceneHandler) DeleteScene(c *gin.Context) { return } + // Check if scene is referenced by any orders + var orderCount int + err = h.db.Get(&orderCount, "SELECT COUNT(*) FROM orders WHERE scene_id = ? AND deleted_at IS NULL", id) + if err != nil { + logger.Printf("[SCENE] Failed to check order references: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete scene"}) + return + } + if orderCount > 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("cannot delete scene referenced by %d orders", orderCount)}) + return + } + now := time.Now().UTC() // Perform soft delete by setting deleted_at diff --git a/internal/api/handlers/task.go b/internal/api/handlers/task.go index c6695fa..8368d18 100644 --- a/internal/api/handlers/task.go +++ b/internal/api/handlers/task.go @@ -6,7 +6,9 @@ package handlers import ( + "crypto/rand" "database/sql" + "encoding/hex" "fmt" "net/http" "strconv" @@ -20,6 +22,24 @@ import ( "archebase.com/keystone-edge/internal/services" ) +func newPublicTaskID(now time.Time, seq int) (string, error) { + // Format: task_YYYYMMDD_HHMMSS_mmm__ + // - millisecond timestamp makes it readable + // - seq differentiates multiple creates in same millisecond + // - rand suffix prevents collision across processes/hosts + b := make([]byte, 4) + if _, err := rand.Read(b); err != nil { + return "", err + } + return fmt.Sprintf( + "task_%s_%03d_%02d_%s", + now.UTC().Format("20060102_150405"), + now.UTC().Nanosecond()/1_000_000, + seq%100, + hex.EncodeToString(b), + ), nil +} + // TaskHandler handles task-related HTTP requests type TaskHandler struct { db *sqlx.DB @@ -36,24 +56,21 @@ func NewTaskHandler(db *sqlx.DB, hub *services.TransferHub) *TaskHandler { // TaskConfig represents the task configuration response type TaskConfig struct { - TaskID string `json:"task_id"` - DeviceID string `json:"device_id"` - Scene string `json:"scene"` - Subscene string `json:"subscene"` - InitialSceneLayout string `json:"initial_scene_layout"` - Skills []string `json:"skills"` - SOPID string `json:"sop_id"` - Topics []string `json:"topics"` - StartCallbackURL string `json:"start_callback_url"` - FinishCallbackURL string `json:"finish_callback_url"` - UserToken string `json:"user_token"` - RecordingConfig RecordingConfig `json:"recording_config"` -} - -// RecordingConfig represents recording configuration settings -type RecordingConfig struct { - MaxDurationSec int `json:"max_duration_sec"` - Compression string `json:"compression"` + TaskID string `json:"task_id"` + DeviceID string `json:"device_id"` + DataCollectorID string `json:"data_collector_id"` + OrderID string `json:"order_id"` + Factory string `json:"factory"` + Scene string `json:"scene"` + WorkstationID string `json:"workstation_id"` + Subscene string `json:"subscene"` + InitialSceneLayout string `json:"initial_scene_layout"` + Skills []string `json:"skills"` + SOPID string `json:"sop_id"` + Topics []string `json:"topics"` + StartCallbackURL string `json:"start_callback_url"` + FinishCallbackURL string `json:"finish_callback_url"` + UserToken string `json:"user_token"` } // RegisterRoutes registers task-related routes @@ -77,6 +94,7 @@ var validTaskStatuses = map[string]struct{}{ // TaskListItem represents a task item in list responses. type TaskListItem struct { ID string `json:"id" db:"id"` + TaskID string `json:"task_id" db:"task_id"` BatchID string `json:"batch_id" db:"batch_id"` OrderID string `json:"order_id" db:"order_id"` SOPID string `json:"sop_id" db:"sop_id"` @@ -105,6 +123,7 @@ type TaskEpisodeDetail struct { // TaskDetailResponse represents the response body for getting a task by ID. type TaskDetailResponse struct { ID string `json:"id" db:"id"` + TaskID string `json:"task_id" db:"task_id"` BatchID string `json:"batch_id" db:"batch_id"` BatchName string `json:"batch_name" db:"batch_name"` OrderID string `json:"order_id" db:"order_id"` @@ -117,6 +136,7 @@ type TaskDetailResponse struct { FactoryID *string `json:"factory_id" db:"factory_id"` OrganizationID *int64 `json:"organization_id" db:"organization_id"` Status string `json:"status" db:"status"` + CreatedAt *string `json:"created_at" db:"created_at"` AssignedAt *string `json:"assigned_at" db:"assigned_at"` StartedAt *string `json:"started_at" db:"started_at"` CompletedAt *string `json:"completed_at" db:"completed_at"` @@ -219,7 +239,8 @@ func (h *TaskHandler) ListTasks(c *gin.Context) { queryArgs := append(append([]interface{}{}, args...), limit, offset) listQuery := fmt.Sprintf(`SELECT - task_id AS id, + CAST(id AS CHAR) AS id, + task_id AS task_id, CAST(batch_id AS CHAR) AS batch_id, CAST(order_id AS CHAR) AS order_id, CAST(sop_id AS CHAR) AS sop_id, @@ -262,11 +283,17 @@ func (h *TaskHandler) ListTasks(c *gin.Context) { // @Failure 500 {object} map[string]string // @Router /tasks/{id} [get] func (h *TaskHandler) GetTask(c *gin.Context) { - taskID := strings.TrimSpace(c.Param("id")) + idStr := strings.TrimSpace(c.Param("id")) + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil || id <= 0 { + c.JSON(http.StatusBadRequest, gin.H{"error_msg": "invalid task id"}) + return + } var task TaskDetailResponse query := `SELECT - t.task_id AS id, + CAST(t.id AS CHAR) AS id, + t.task_id AS task_id, CAST(t.batch_id AS CHAR) AS batch_id, COALESCE(t.batch_name, '') AS batch_name, CAST(t.order_id AS CHAR) AS order_id, @@ -279,19 +306,20 @@ func (h *TaskHandler) GetTask(c *gin.Context) { CASE WHEN t.factory_id IS NULL THEN NULL ELSE CAST(t.factory_id AS CHAR) END AS factory_id, t.organization_id AS organization_id, t.status, - CASE WHEN t.assigned_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(t.assigned_at, @@session.time_zone, '+00:00'), '%%Y-%%m-%%dT%%H:%%i:%%sZ') END AS assigned_at, - CASE WHEN t.started_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(t.started_at, @@session.time_zone, '+00:00'), '%%Y-%%m-%%dT%%H:%%i:%%sZ') END AS started_at, - CASE WHEN t.completed_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(t.completed_at, @@session.time_zone, '+00:00'), '%%Y-%%m-%%dT%%H:%%i:%%sZ') END AS completed_at, + CASE WHEN t.created_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(t.created_at, @@session.time_zone, '+00:00'), '%Y-%m-%dT%H:%i:%sZ') END AS created_at, + CASE WHEN t.assigned_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(t.assigned_at, @@session.time_zone, '+00:00'), '%Y-%m-%dT%H:%i:%sZ') END AS assigned_at, + CASE WHEN t.started_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(t.started_at, @@session.time_zone, '+00:00'), '%Y-%m-%dT%H:%i:%sZ') END AS started_at, + CASE WHEN t.completed_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(t.completed_at, @@session.time_zone, '+00:00'), '%Y-%m-%dT%H:%i:%sZ') END AS completed_at, e.episode_id AS episode_id FROM tasks t LEFT JOIN episodes e ON e.task_id = t.id AND e.deleted_at IS NULL - WHERE t.task_id = ? AND t.deleted_at IS NULL + WHERE t.id = ? AND t.deleted_at IS NULL LIMIT 1` - err := h.db.Get(&task, query, taskID) + err = h.db.Get(&task, query, id) if err == sql.ErrNoRows { c.JSON(http.StatusNotFound, gin.H{ - "error_msg": "Task not found: " + taskID, + "error_msg": "Task not found: " + idStr, }) return } @@ -325,11 +353,16 @@ func (h *TaskHandler) GetTask(c *gin.Context) { // @Failure 500 {object} map[string]string // @Router /tasks/{id} [put] func (h *TaskHandler) UpdateTask(c *gin.Context) { - taskID := strings.TrimSpace(c.Param("id")) - if taskID == "" { + idStr := strings.TrimSpace(c.Param("id")) + if idStr == "" { c.JSON(http.StatusBadRequest, gin.H{"error_msg": "task id is required"}) return } + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil || id <= 0 { + c.JSON(http.StatusBadRequest, gin.H{"error_msg": "invalid task id"}) + return + } var req UpdateTaskRequest if err := c.ShouldBindJSON(&req); err != nil { @@ -360,9 +393,9 @@ func (h *TaskHandler) UpdateTask(c *gin.Context) { var taskRow struct { Status string `db:"status"` } - err := h.db.Get(&taskRow, "SELECT status FROM tasks WHERE task_id = ? AND deleted_at IS NULL", taskID) + err = h.db.Get(&taskRow, "SELECT status FROM tasks WHERE id = ? AND deleted_at IS NULL", id) if err == sql.ErrNoRows { - c.JSON(http.StatusNotFound, gin.H{"error_msg": "Task not found: " + taskID}) + c.JSON(http.StatusNotFound, gin.H{"error_msg": "Task not found: " + idStr}) return } if err != nil { @@ -382,12 +415,12 @@ func (h *TaskHandler) UpdateTask(c *gin.Context) { now := time.Now().UTC() result, err := h.db.Exec( - "UPDATE tasks SET status = ?, updated_at = ?, ready_at = CASE WHEN ? = 'ready' THEN ? ELSE ready_at END WHERE task_id = ? AND status = ? AND deleted_at IS NULL", + "UPDATE tasks SET status = ?, updated_at = ?, ready_at = CASE WHEN ? = 'ready' THEN ? ELSE ready_at END WHERE id = ? AND status = ? AND deleted_at IS NULL", req.Status, now, req.Status, now, - taskID, + id, taskRow.Status, ) if err != nil { @@ -413,7 +446,7 @@ func (h *TaskHandler) UpdateTask(c *gin.Context) { } c.JSON(http.StatusOK, UpdateTaskResponse{ - ID: taskID, + ID: idStr, Status: req.Status, UpdatedAt: now.Format(time.RFC3339), }) @@ -421,9 +454,20 @@ func (h *TaskHandler) UpdateTask(c *gin.Context) { // CreateTaskResponse represents the response body for creating a task. type CreateTaskResponse struct { - ID string `json:"id"` - Status string `json:"status"` - CreatedAt string `json:"created_at"` + ID string `json:"id"` + TaskID string `json:"task_id"` + Status string `json:"status"` + CreatedAt string `json:"created_at"` + Tasks []CreateTaskResponse `json:"tasks"` +} + +// CreateTaskRequest represents the request body for creating a task. +type CreateTaskRequest struct { + OrderID int64 `json:"order_id" binding:"required"` + SOPID int64 `json:"sop_id" binding:"required"` + SubsceneID int64 `json:"subscene_id" binding:"required"` + WorkstationID int64 `json:"workstation_id" binding:"required"` + Quantity *int `json:"quantity,omitempty"` } // CreateTask handles task creation requests. @@ -437,46 +481,267 @@ type CreateTaskResponse struct { // @Failure 500 {object} map[string]string // @Router /tasks [post] func (h *TaskHandler) CreateTask(c *gin.Context) { + var req CreateTaskRequest + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + "error_msg": "Invalid request body: " + err.Error(), + }) + return + } + now := time.Now().UTC() - taskID := now.Format("task_20060102_150405") - - _, err := h.db.Exec( - `INSERT INTO tasks ( - task_id, - batch_id, - order_id, - sop_id, - workstation_id, - scene_id, - subscene_id, - status, - created_at, - updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, - taskID, - 0, - 0, - 0, - nil, - 0, - 0, - "pending", - now, - now, - ) + quantity := 1 + if req.Quantity != nil { + quantity = *req.Quantity + } + if quantity < 1 { + c.JSON(http.StatusBadRequest, gin.H{"error_msg": "Invalid quantity: must be >= 1"}) + return + } + if quantity > 1000 { + c.JSON(http.StatusBadRequest, gin.H{"error_msg": "Invalid quantity: must be <= 1000"}) + return + } + + tx, err := h.db.Beginx() if err != nil { - logger.Printf("[TASK] Failed to insert task: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{ - "error": "failed to create task", + logger.Printf("[TASK] Failed to start transaction: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "failed to create task"}) + return + } + defer func() { + _ = tx.Rollback() + }() + + // Validate referenced rows exist (all IDs are table `id` fields). + var targetCount int + // Lock the order row to coordinate concurrent task generation for this order. + if err := tx.Get(&targetCount, "SELECT target_count FROM orders WHERE id = ? AND deleted_at IS NULL LIMIT 1 FOR UPDATE", req.OrderID); err != nil { + if err == sql.ErrNoRows { + c.JSON(http.StatusBadRequest, gin.H{"error_msg": fmt.Sprintf("Invalid order_id: %d", req.OrderID)}) + return + } + logger.Printf("[TASK] Failed to validate order_id: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "failed to create task"}) + return + } + // Lock existing tasks for this order, so concurrent inserts will serialize under REPEATABLE READ. + var lockedTaskIDs []int64 + if err := tx.Select(&lockedTaskIDs, "SELECT id FROM tasks WHERE order_id = ? AND deleted_at IS NULL FOR UPDATE", req.OrderID); err != nil { + logger.Printf("[TASK] Failed to lock existing tasks: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "failed to create task"}) + return + } + existingCount := len(lockedTaskIDs) + if existingCount+quantity > targetCount { + remaining := targetCount - existingCount + if remaining < 0 { + remaining = 0 + } + c.JSON(http.StatusBadRequest, gin.H{ + "error_msg": fmt.Sprintf("Invalid quantity: order target_count=%d, existing_tasks=%d, remaining=%d", targetCount, existingCount, remaining), + }) + return + } + + var exists int + if err := tx.Get(&exists, "SELECT 1 FROM sops WHERE id = ? AND deleted_at IS NULL LIMIT 1", req.SOPID); err != nil { + if err == sql.ErrNoRows { + c.JSON(http.StatusBadRequest, gin.H{"error_msg": fmt.Sprintf("Invalid sop_id: %d", req.SOPID)}) + return + } + logger.Printf("[TASK] Failed to validate sop_id: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "failed to create task"}) + return + } + type subsceneRow struct { + ID int64 `db:"id"` + SceneID int64 `db:"scene_id"` + Scene string `db:"scene_name"` + Name string `db:"name"` + Layout string `db:"initial_scene_layout"` + } + var subscene subsceneRow + if err := tx.Get(&subscene, ` + SELECT + ss.id, + ss.scene_id, + s.name AS scene_name, + ss.name, + COALESCE(ss.initial_scene_layout, '') AS initial_scene_layout + FROM subscenes ss + JOIN scenes s ON s.id = ss.scene_id AND s.deleted_at IS NULL + WHERE ss.id = ? AND ss.deleted_at IS NULL + LIMIT 1`, req.SubsceneID); err != nil { + if err == sql.ErrNoRows { + c.JSON(http.StatusBadRequest, gin.H{"error_msg": fmt.Sprintf("Invalid subscene_id: %d", req.SubsceneID)}) + return + } + logger.Printf("[TASK] Failed to validate subscene_id: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "failed to create task"}) + return + } + type workstationRow struct { + ID int64 `db:"id"` + FactoryID int64 `db:"factory_id"` + } + var ws workstationRow + if err := tx.Get(&ws, "SELECT id, factory_id FROM workstations WHERE id = ? AND deleted_at IS NULL LIMIT 1", req.WorkstationID); err != nil { + if err == sql.ErrNoRows { + c.JSON(http.StatusBadRequest, gin.H{"error_msg": fmt.Sprintf("Invalid workstation_id: %d", req.WorkstationID)}) + return + } + logger.Printf("[TASK] Failed to validate workstation_id: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "failed to create task"}) + return + } + + // Ensure a batch exists for (order_id, workstation_id). Prefer active/pending, otherwise create. + type batchRow struct { + ID int64 `db:"id"` + Name string `db:"name"` + } + var batch batchRow + batchQuery := ` + SELECT id, name + FROM batches + WHERE order_id = ? AND workstation_id = ? AND deleted_at IS NULL AND status IN ('pending', 'active') + ORDER BY created_at DESC, id DESC + LIMIT 1` + err = tx.Get(&batch, batchQuery, req.OrderID, req.WorkstationID) + if err != nil && err != sql.ErrNoRows { + logger.Printf("[TASK] Failed to query batch: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "failed to create task"}) + return + } + if err == sql.ErrNoRows { + batchIDStr := now.Format("batch_20060102_150405") + batchName := fmt.Sprintf("Batch %s (order=%d ws=%d)", batchIDStr, req.OrderID, req.WorkstationID) + res, err := tx.Exec( + `INSERT INTO batches ( + batch_id, + order_id, + workstation_id, + name, + status, + created_at, + updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?)`, + batchIDStr, + req.OrderID, + req.WorkstationID, + batchName, + "pending", + now, + now, + ) + if err != nil { + logger.Printf("[TASK] Failed to insert batch: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "failed to create task"}) + return + } + newID, err := res.LastInsertId() + if err != nil { + logger.Printf("[TASK] Failed to get batch insert id: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "failed to create task"}) + return + } + batch.ID = newID + batch.Name = batchName + } + + // Denormalized filtering fields + var organizationID int64 + if err := tx.Get(&organizationID, "SELECT organization_id FROM factories WHERE id = ? AND deleted_at IS NULL LIMIT 1", ws.FactoryID); err != nil { + if err == sql.ErrNoRows { + c.JSON(http.StatusBadRequest, gin.H{"error_msg": "workstation factory not found"}) + return + } + logger.Printf("[TASK] Failed to resolve organization_id: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "failed to create task"}) + return + } + + created := make([]CreateTaskResponse, 0, quantity) + for i := 0; i < quantity; i++ { + taskID, err := newPublicTaskID(now, i) + if err != nil { + logger.Printf("[TASK] Failed to generate task_id: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "failed to create task"}) + return + } + resTask, err := tx.Exec( + `INSERT INTO tasks ( + task_id, + batch_id, + order_id, + sop_id, + workstation_id, + scene_id, + subscene_id, + batch_name, + scene_name, + subscene_name, + factory_id, + organization_id, + initial_scene_layout, + status, + created_at, + updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + taskID, + batch.ID, + req.OrderID, + req.SOPID, + req.WorkstationID, + subscene.SceneID, + req.SubsceneID, + batch.Name, + subscene.Scene, + subscene.Name, + ws.FactoryID, + organizationID, + subscene.Layout, + "pending", + now, + now, + ) + if err != nil { + logger.Printf("[TASK] Failed to insert task: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "failed to create task"}) + return + } + newTaskID, err := resTask.LastInsertId() + if err != nil { + logger.Printf("[TASK] Failed to get task insert id: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "failed to create task"}) + return + } + created = append(created, CreateTaskResponse{ + ID: fmt.Sprintf("%d", newTaskID), + TaskID: taskID, + Status: "pending", + CreatedAt: now.Format(time.RFC3339), }) + } + + if err := tx.Commit(); err != nil { + logger.Printf("[TASK] Failed to commit transaction: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "failed to create task"}) return } - c.JSON(http.StatusCreated, CreateTaskResponse{ - ID: taskID, + resp := CreateTaskResponse{ Status: "pending", CreatedAt: now.Format(time.RFC3339), - }) + Tasks: created, + } + // Backwards compatibility for clients expecting a single task. + if len(created) > 0 { + resp.ID = created[0].ID + resp.TaskID = created[0].TaskID + } + c.JSON(http.StatusCreated, resp) } // RegisterCallbackRoutes registers callback routes for handling external events. @@ -654,26 +919,154 @@ func (h *TaskHandler) OnRecordingFinish(c *gin.Context) { // @Failure 500 {object} map[string]string // @Router /tasks/{id}/config [get] func (h *TaskHandler) GetTaskConfig(c *gin.Context) { - taskID := c.Param("id") + idStr := strings.TrimSpace(c.Param("id")) + id, err := strconv.ParseInt(idStr, 10, 64) + if err != nil || id <= 0 { + c.JSON(http.StatusBadRequest, gin.H{"error_msg": "invalid task id"}) + return + } + + type taskConfigRow struct { + TaskID string `db:"task_id"` + WorkstationID sql.NullInt64 `db:"workstation_id"` + RobotSerial sql.NullString `db:"robot_serial"` + RobotID sql.NullInt64 `db:"robot_id"` + CollectorName sql.NullString `db:"collector_name"` + OrderName sql.NullString `db:"order_name"` + Workstation sql.NullString `db:"workstation_name"` + FactoryName sql.NullString `db:"factory_name"` + SceneName sql.NullString `db:"scene_name"` + SubsceneName sql.NullString `db:"subscene_name"` + Layout sql.NullString `db:"initial_scene_layout"` + SOPName sql.NullString `db:"sop_name"` + SkillSequence sql.NullString `db:"skill_sequence"` + ROSTopics sql.NullString `db:"ros_topics"` + } + + var row taskConfigRow + if err := h.db.Get(&row, ` + SELECT + t.task_id AS task_id, + t.workstation_id AS workstation_id, + ws.robot_serial AS robot_serial, + ws.robot_id AS robot_id, + COALESCE(ws.collector_name, '') AS collector_name, + COALESCE(o.name, '') AS order_name, + COALESCE(ws.name, '') AS workstation_name, + COALESCE(f.name, '') AS factory_name, + COALESCE(t.scene_name, '') AS scene_name, + COALESCE(t.subscene_name, '') AS subscene_name, + COALESCE(t.initial_scene_layout, '') AS initial_scene_layout, + s.name AS sop_name, + COALESCE(s.skill_sequence, '[]') AS skill_sequence, + COALESCE(rt.ros_topics, '[]') AS ros_topics + FROM tasks t + LEFT JOIN factories f ON f.id = t.factory_id AND f.deleted_at IS NULL + LEFT JOIN orders o ON o.id = t.order_id AND o.deleted_at IS NULL + LEFT JOIN workstations ws ON ws.id = t.workstation_id AND ws.deleted_at IS NULL + LEFT JOIN robots r ON r.id = ws.robot_id AND r.deleted_at IS NULL + LEFT JOIN robot_types rt ON rt.id = r.robot_type_id AND rt.deleted_at IS NULL + LEFT JOIN sops s ON s.id = t.sop_id AND s.deleted_at IS NULL + WHERE t.id = ? AND t.deleted_at IS NULL + LIMIT 1 + `, id); err != nil { + if err == sql.ErrNoRows { + c.JSON(http.StatusNotFound, gin.H{"error_msg": "Task not found: " + idStr}) + return + } + logger.Printf("[TASK] Failed to query task for config: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "Failed to query task"}) + return + } + + if !row.WorkstationID.Valid { + c.JSON(http.StatusConflict, gin.H{"error_msg": "Task has no workstation assigned"}) + return + } + if !row.RobotSerial.Valid || strings.TrimSpace(row.RobotSerial.String) == "" { + c.JSON(http.StatusConflict, gin.H{"error_msg": fmt.Sprintf("Workstation %d has no robot_serial", row.WorkstationID.Int64)}) + return + } + if !row.RobotID.Valid || row.RobotID.Int64 <= 0 { + c.JSON(http.StatusConflict, gin.H{"error_msg": fmt.Sprintf("Workstation %d has no robot_id", row.WorkstationID.Int64)}) + return + } + if strings.TrimSpace(row.CollectorName.String) == "" { + c.JSON(http.StatusConflict, gin.H{"error_msg": fmt.Sprintf("Workstation %d has no collector_name", row.WorkstationID.Int64)}) + return + } + if strings.TrimSpace(row.OrderName.String) == "" { + c.JSON(http.StatusConflict, gin.H{"error_msg": "Task order_id not found"}) + return + } + if strings.TrimSpace(row.Workstation.String) == "" { + c.JSON(http.StatusConflict, gin.H{"error_msg": fmt.Sprintf("Workstation %d has no name", row.WorkstationID.Int64)}) + return + } + if strings.TrimSpace(row.FactoryName.String) == "" { + c.JSON(http.StatusConflict, gin.H{"error_msg": "Task factory_id not found"}) + return + } + if !row.ROSTopics.Valid || strings.TrimSpace(row.ROSTopics.String) == "" { + c.JSON(http.StatusConflict, gin.H{"error_msg": fmt.Sprintf("Robot %d has no robot_type ros_topics", row.RobotID.Int64)}) + return + } + if !row.SOPName.Valid || strings.TrimSpace(row.SOPName.String) == "" { + c.JSON(http.StatusConflict, gin.H{"error_msg": "Task sop_id not found"}) + return + } + + // Resolve skill ids (from sop.skill_sequence) to skill names, preserving order. + skillIDs := parseJSONArray(row.SkillSequence.String) + skills := make([]string, 0, len(skillIDs)) + if len(skillIDs) > 0 { + type skillRow struct { + ID string `db:"id"` + Name string `db:"name"` + } + query, args, err := sqlx.In( + `SELECT CAST(id AS CHAR) AS id, name FROM skills WHERE deleted_at IS NULL AND id IN (?)`, + skillIDs, + ) + if err != nil { + logger.Printf("[TASK] Failed to build skill query: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "Failed to query skills"}) + return + } + query = h.db.Rebind(query) + var rows []skillRow + if err := h.db.Select(&rows, query, args...); err != nil { + logger.Printf("[TASK] Failed to query skills: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error_msg": "Failed to query skills"}) + return + } + nameByID := make(map[string]string, len(rows)) + for _, r := range rows { + nameByID[strings.TrimSpace(r.ID)] = strings.TrimSpace(r.Name) + } + for _, id := range skillIDs { + if name, ok := nameByID[strings.TrimSpace(id)]; ok && name != "" { + skills = append(skills, name) + } + } + } - // Return mocked data - // #nosec G101 - This is a mock response for testing purposes. taskConfig := TaskConfig{ - TaskID: taskID, - DeviceID: "robot_arm_001", - Scene: "commercial_kitchen", - Subscene: "dishwashing_station", - InitialSceneLayout: "A rectangular table (80cm x 120cm) in the center of the kitchen with a sink on the left side. The robot arm is positioned at the edge of the table.", - Skills: []string{"pick", "place", "navigate"}, - SOPID: "sop_dish_cleaning_v2", - Topics: []string{"/camera/color/image_raw", "/camera/depth/image_rect_raw", "/joint_states", "/gripper/state", "/odom"}, + TaskID: row.TaskID, + DeviceID: strings.TrimSpace(row.RobotSerial.String), + DataCollectorID: strings.TrimSpace(row.CollectorName.String), + OrderID: strings.TrimSpace(row.OrderName.String), + Factory: strings.TrimSpace(row.FactoryName.String), + Scene: strings.TrimSpace(row.SceneName.String), + WorkstationID: strings.TrimSpace(row.Workstation.String), + Subscene: strings.TrimSpace(row.SubsceneName.String), + InitialSceneLayout: strings.TrimSpace(row.Layout.String), + Skills: skills, + SOPID: strings.TrimSpace(row.SOPName.String), + Topics: parseJSONArray(row.ROSTopics.String), StartCallbackURL: "http://keystone.factory.internal/api/v1/callbacks/start", FinishCallbackURL: "http://keystone.factory.internal/api/v1/callbacks/finish", - UserToken: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyXzAwMSIsInNjb3BlIjpbImRldmljZSJdLCJleHAiOjE3MzY4MTIwMDB9.mock_signature", - RecordingConfig: RecordingConfig{ - MaxDurationSec: 600, - Compression: "zstd", - }, + UserToken: "", } c.JSON(http.StatusOK, taskConfig) diff --git a/internal/api/handlers/transfer.go b/internal/api/handlers/transfer.go index 6c9d8ee..aff563f 100644 --- a/internal/api/handlers/transfer.go +++ b/internal/api/handlers/transfer.go @@ -9,6 +9,7 @@ import ( "context" "database/sql" "encoding/json" + "errors" "fmt" "net" "net/http" @@ -310,6 +311,14 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra } }() + // Resolve task numeric ID early for status updates. + var taskPK int64 + if err := tx.QueryRowContext(ctx, "SELECT id FROM tasks WHERE task_id = ? AND deleted_at IS NULL", taskID).Scan(&taskPK); err != nil { + // #nosec G706 -- Set aside for now + logger.Printf("[TRANSFER] Device %s: failed to resolve task id for task=%s: %v", dc.DeviceID, taskID, err) + return + } + // Check if mcap_path and sidecar_path already exist in database var count int err = tx.QueryRowContext(ctx, @@ -362,40 +371,89 @@ func (h *TransferHandler) onUploadComplete(ctx context.Context, dc *services.Tra return } - episodeID := uuid.New().String() - _, dbErr := tx.ExecContext(ctx, - `INSERT INTO episodes ( - episode_id, - task_id, - batch_id, - order_id, - scene_id, - scene_name, - workstation_id, - factory_id, - organization_id, - sop_id, - mcap_path, - sidecar_path - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, - episodeID, - taskRow.ID, - taskRow.BatchID, - taskRow.OrderID, - taskRow.SceneID, - taskRow.SceneName, - taskRow.WorkstationID, - taskRow.FactoryID, - taskRow.OrganizationID, - taskRow.SOPID, - mcapKey, - jsonKey, - ) - if dbErr != nil { + // Idempotency: avoid creating duplicate episodes for the same task. + // This keeps batches.episode_count correct even if the device retries uploads. + var existingEpisodeID string + err := tx.QueryRowContext(ctx, ` + SELECT episode_id + FROM episodes + WHERE task_id = ? AND deleted_at IS NULL + LIMIT 1 + `, taskRow.ID).Scan(&existingEpisodeID) + + if err == nil && existingEpisodeID == "" { // #nosec G706 -- Set aside for now - logger.Printf("[TRANSFER] Device %s: DB insert failed for task=%s: %v", dc.DeviceID, taskID, dbErr) + logger.Printf("[TRANSFER] Device %s: data corruption: empty episode_id found for task_pk=%d task=%s", dc.DeviceID, taskRow.ID, taskID) return } + if err != nil && !errors.Is(err, sql.ErrNoRows) { + // #nosec G706 -- Set aside for now + logger.Printf("[TRANSFER] Device %s: DB query failed for existing episode check task_pk=%d task=%s: %v", dc.DeviceID, taskRow.ID, taskID, err) + return + } + + if errors.Is(err, sql.ErrNoRows) { + episodeID := uuid.New().String() + _, dbErr := tx.ExecContext(ctx, + `INSERT INTO episodes ( + episode_id, + task_id, + batch_id, + order_id, + scene_id, + scene_name, + workstation_id, + factory_id, + organization_id, + sop_id, + mcap_path, + sidecar_path + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + episodeID, + taskRow.ID, + taskRow.BatchID, + taskRow.OrderID, + taskRow.SceneID, + taskRow.SceneName, + taskRow.WorkstationID, + taskRow.FactoryID, + taskRow.OrganizationID, + taskRow.SOPID, + mcapKey, + jsonKey, + ) + if dbErr != nil { + // #nosec G706 -- Set aside for now + logger.Printf("[TRANSFER] Device %s: DB insert failed for task=%s: %v", dc.DeviceID, taskID, dbErr) + return + } + + // Write-time maintenance for batch episode_count. + if _, dbErr := tx.ExecContext(ctx, ` + UPDATE batches + SET episode_count = episode_count + 1, updated_at = updated_at + WHERE id = ? AND deleted_at IS NULL + `, taskRow.BatchID); dbErr != nil { + // #nosec G706 -- Set aside for now + logger.Printf("[TRANSFER] Device %s: DB update failed for batch=%d task=%s: %v", dc.DeviceID, taskRow.BatchID, taskID, dbErr) + return + } + } + } + + // Episode is confirmed (inserted or already existed). Mark task as completed. + now := time.Now().UTC() + if _, dbErr := tx.ExecContext(ctx, ` + UPDATE tasks + SET + status = 'completed', + completed_at = CASE WHEN completed_at IS NULL THEN ? ELSE completed_at END, + updated_at = ? + WHERE id = ? AND deleted_at IS NULL + `, now, now, taskPK); dbErr != nil { + // #nosec G706 -- Set aside for now + logger.Printf("[TRANSFER] Device %s: DB update failed for task=%s: %v", dc.DeviceID, taskID, dbErr) + return } // Commit transaction diff --git a/internal/server/server.go b/internal/server/server.go index 2849062..df3c3dc 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -36,6 +36,7 @@ type Server struct { recorder *handlers.RecorderHandler episode *handlers.EpisodeHandler task *handlers.TaskHandler + batch *handlers.BatchHandler robotType *handlers.RobotTypeHandler robot *handlers.RobotHandler factory *handlers.FactoryHandler @@ -47,6 +48,7 @@ type Server struct { sop *handlers.SOPHandler scene *handlers.SceneHandler subscene *handlers.SubsceneHandler + order *handlers.OrderHandler httpServer *http.Server transferWSServer *http.Server recorderWSServer *http.Server @@ -83,6 +85,7 @@ func New(cfg *config.Config, db *sqlx.DB, s3Client *s3.Client) *Server { // Create database-dependent handlers only when DB is available var ( + batchHandler *handlers.BatchHandler robotTypeHandler *handlers.RobotTypeHandler robotHandler *handlers.RobotHandler factoryHandler *handlers.FactoryHandler @@ -94,8 +97,10 @@ func New(cfg *config.Config, db *sqlx.DB, s3Client *s3.Client) *Server { sopHandler *handlers.SOPHandler sceneHandler *handlers.SceneHandler subsceneHandler *handlers.SubsceneHandler + orderHandler *handlers.OrderHandler ) if db != nil { + batchHandler = handlers.NewBatchHandler(db) robotTypeHandler = handlers.NewRobotTypeHandler(db) robotHandler = handlers.NewRobotHandler(db, recorderHub, transferHub) factoryHandler = handlers.NewFactoryHandler(db) @@ -107,6 +112,7 @@ func New(cfg *config.Config, db *sqlx.DB, s3Client *s3.Client) *Server { sopHandler = handlers.NewSOPHandler(db) sceneHandler = handlers.NewSceneHandler(db) subsceneHandler = handlers.NewSubsceneHandler(db) + orderHandler = handlers.NewOrderHandler(db) } s := &Server{ @@ -116,6 +122,7 @@ func New(cfg *config.Config, db *sqlx.DB, s3Client *s3.Client) *Server { recorder: recorderHandler, episode: episodeHandler, task: taskHandler, + batch: batchHandler, robotType: robotTypeHandler, robot: robotHandler, factory: factoryHandler, @@ -127,6 +134,7 @@ func New(cfg *config.Config, db *sqlx.DB, s3Client *s3.Client) *Server { sop: sopHandler, scene: sceneHandler, subscene: subsceneHandler, + order: orderHandler, engine: engine, } @@ -185,6 +193,9 @@ func (s *Server) buildRoutes() http.Handler { // Tasks API v1Tasks := v1.Group("") s.task.RegisterRoutes(v1Tasks) + if s.batch != nil { + s.batch.RegisterRoutes(v1Tasks) + } if s.robotType != nil { s.robotType.RegisterRoutes(v1Tasks) } @@ -218,6 +229,9 @@ func (s *Server) buildRoutes() http.Handler { if s.subscene != nil { s.subscene.RegisterRoutes(v1Tasks) } + if s.order != nil { + s.order.RegisterRoutes(v1Tasks) + } // Axon callbacks v1Callbacks := v1.Group("/callbacks") diff --git a/internal/storage/database/migrations/000001_initial_schema.up.sql b/internal/storage/database/migrations/000001_initial_schema.up.sql index fd3dd0a..8366072 100644 --- a/internal/storage/database/migrations/000001_initial_schema.up.sql +++ b/internal/storage/database/migrations/000001_initial_schema.up.sql @@ -219,7 +219,7 @@ CREATE TABLE IF NOT EXISTS orders ( id BIGINT AUTO_INCREMENT PRIMARY KEY, organization_id BIGINT NOT NULL, scene_id BIGINT NOT NULL, - name VARCHAR(255), + name VARCHAR(255) NOT NULL, target_count INT NOT NULL, priority ENUM('low', 'normal', 'high', 'urgent') DEFAULT 'normal', status ENUM('created', 'in_progress', 'paused', 'completed', 'cancelled') DEFAULT 'created', @@ -228,7 +228,7 @@ CREATE TABLE IF NOT EXISTS orders ( created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, deleted_at TIMESTAMP NULL, - _name_unique VARCHAR(600) GENERATED ALWAYS AS (CONCAT(IFNULL(organization_id, ''), '|', IFNULL(scene_id, ''), '|', IFNULL(name, ''), '|', IFNULL(deleted_at, ''))) STORED, + _name_unique VARCHAR(600) GENERATED ALWAYS AS (CONCAT(IFNULL(organization_id, ''), '|', IFNULL(name, ''), '|', IFNULL(deleted_at, ''))) STORED, UNIQUE INDEX idx_name_del (_name_unique), INDEX idx_org (organization_id), INDEX idx_scene (scene_id),