diff --git a/.gitignore b/.gitignore index 1ca4dd7..d39da4f 100644 --- a/.gitignore +++ b/.gitignore @@ -81,6 +81,9 @@ release/ # Claude Code .claude/settings.local.json +# Agent guides (local only) +AGENTS.md + # swagger docs/docs.go docs/swagger.json diff --git a/ROADMAP.md b/ROADMAP.md index 1f8b2c4..0be007f 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -54,16 +54,16 @@ First Release Implementation Order: Second Release Implementation Order: 1. **Authentication & Authorization** - - JWT authentication + - ✅ JWT authentication - Role-based access control (RBAC) - API private key management 2. **Scene & Skill Management** - ✅ scene and subscene CRUD - ✅ skill and sop CRUD 3. **Order & Task Management** - - order CRUD - - batch CRUD - - task dispatch + - ✅ order CRUD + - ✅ batch CRUD + - ✅ task dispatch lifecycle 4. **upload queue** - Upload prioritization and bandwidth throttling 5. **QA Pipeline** diff --git a/internal/api/handlers/axon_rpc.go b/internal/api/handlers/axon_rpc.go index 7722f91..1049243 100644 --- a/internal/api/handlers/axon_rpc.go +++ b/internal/api/handlers/axon_rpc.go @@ -152,21 +152,28 @@ func (h *RecorderHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request logger.Printf("[RECORDER] Device %s: WebSocket accept error: %v", deviceID, err) return } + + remoteIP := extractIP(r.RemoteAddr) + rc := h.hub.NewRecorderConn(conn, deviceID, remoteIP) + if !h.hub.Connect(deviceID, rc) { + logger.Printf("[RECORDER] Device %s: connection rejected (already connected)", deviceID) + if err := conn.Close(websocket.StatusPolicyViolation, "device already connected"); err != nil { + logger.Printf("[RECORDER] Device %s: WebSocket close error: %v", deviceID, err) + } + return + } + defer func() { if err := conn.Close(websocket.StatusNormalClosure, ""); err != nil { logger.Printf("[RECORDER] Device %s: WebSocket close error: %v", deviceID, err) } }() + defer h.hub.Disconnect(deviceID, rc) + defer revertRunnableTasksOnDeviceDisconnect(h.db, deviceID, nil, 0, false) ctx := r.Context() go h.pingLoop(ctx, conn) - remoteIP := extractIP(r.RemoteAddr) - rc := h.hub.NewRecorderConn(conn, deviceID, remoteIP) - h.hub.Connect(deviceID, rc) - defer h.hub.Disconnect(deviceID) - defer revertRunnableTasksOnDeviceDisconnect(h.db, deviceID, nil, 0, false) - // #nosec G706 -- Set aside for now logger.Printf("[RECORDER] Recorder %s connected from %s", deviceID, remoteIP) diff --git a/internal/api/handlers/batch.go b/internal/api/handlers/batch.go index c91a8c8..1a78f54 100644 --- a/internal/api/handlers/batch.go +++ b/internal/api/handlers/batch.go @@ -123,10 +123,12 @@ type BatchListItem struct { // ListBatchesResponse represents the response body for listing batches. type ListBatchesResponse struct { - Batches []BatchListItem `json:"batches"` + Items []BatchListItem `json:"items"` Total int `json:"total"` Limit int `json:"limit"` Offset int `json:"offset"` + HasNext bool `json:"hasNext,omitempty"` + HasPrev bool `json:"hasPrev,omitempty"` } type batchRow struct { @@ -145,8 +147,8 @@ type batchRow struct { StartedAt sql.NullTime `db:"started_at"` EndedAt sql.NullTime `db:"ended_at"` Metadata sql.NullString `db:"metadata"` - CreatedAt sql.NullString `db:"created_at"` - UpdatedAt sql.NullString `db:"updated_at"` + CreatedAt sql.NullTime `db:"created_at"` + UpdatedAt sql.NullTime `db:"updated_at"` } func parseNullableJSON(v sql.NullString) any { @@ -182,11 +184,11 @@ func batchListItemFromRow(r batchRow) BatchListItem { createdAt := "" if r.CreatedAt.Valid { - createdAt = r.CreatedAt.String + createdAt = r.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if r.UpdatedAt.Valid { - updatedAt = r.UpdatedAt.String + updatedAt = r.UpdatedAt.Time.UTC().Format(time.RFC3339) } nameOut := "" @@ -339,11 +341,16 @@ func (h *BatchHandler) ListBatches(c *gin.Context) { batches = append(batches, batchListItemFromRow(r)) } + hasNext := (offset + limit) < total + hasPrev := offset > 0 + c.JSON(http.StatusOK, ListBatchesResponse{ - Batches: batches, + Items: batches, Total: total, Limit: limit, Offset: offset, + HasNext: hasNext, + HasPrev: hasPrev, }) } @@ -1442,6 +1449,10 @@ func (h *BatchHandler) AdjustBatchTasks(c *gin.Context) { return } + // If task deletions/inserts made the batch terminal, advance status accordingly. + // Example: target reduced from 2 -> 1 after 1 task completed; remaining tasks may all be terminal. + tryAdvanceBatchStatus(h.db, batchNumID) + c.JSON(http.StatusOK, AdjustBatchTasksResponse{ CreatedTasks: createdTasks, DeletedTaskIDs: deletedTaskIDs, @@ -1650,7 +1661,7 @@ func (h *BatchHandler) ListBatchTasks(c *gin.Context) { } c.JSON(http.StatusOK, ListTasksResponse{ - Tasks: items, + Items: items, Total: len(items), Limit: len(items), Offset: 0, @@ -1815,8 +1826,24 @@ func tryAdvanceBatchStatus(db *sqlx.DB, batchID int64) { switch info.Status { case "pending": - // Advance to active: a task just reached a terminal state, so this batch has started. - // Then immediately re-evaluate completion: it's possible all tasks are already terminal. + // Advance to active only if at least one task is already in terminal state. + // Historically this function was only called after a task reached terminal state, + // but it may also be invoked after batch task adjustments (create/delete), so we + // must guard against incorrectly advancing a never-started batch. + var terminalCount int + if err := tx.Get(&terminalCount, ` + SELECT COUNT(*) FROM tasks + WHERE batch_id = ? AND deleted_at IS NULL + AND status IN ('completed', 'failed', 'cancelled')`, + batchID); err != nil { + logger.Printf("[BATCH] tryAdvanceBatchStatus: failed to count terminal tasks for batch %d: %v", batchID, err) + return + } + if terminalCount == 0 { + return + } + + // Now it's safe to mark started; then re-evaluate completion. if _, err := tx.Exec( "UPDATE batches SET status = 'active', started_at = ?, updated_at = ? WHERE id = ? AND status = 'pending' AND deleted_at IS NULL", now, now, batchID, diff --git a/internal/api/handlers/common.go b/internal/api/handlers/common.go index 5130009..9be5d60 100644 --- a/internal/api/handlers/common.go +++ b/internal/api/handlers/common.go @@ -7,18 +7,80 @@ package handlers import ( "database/sql" "encoding/json" + "net/http" + "strconv" "strings" - "time" "unicode" "unicode/utf8" + + "github.com/gin-gonic/gin" ) -// maxSlugLength matches VARCHAR(100) for slug columns in the schema. -const maxSlugLength = 100 +const ( + defaultLimit = 50 + maxLimit = 100 + maxSlugLength = 100 +) -// invalidSlugUserMessage is returned when slug fails isValidSlug (length or charset). const invalidSlugUserMessage = "slug must be at most 100 characters and contain only letters, digits, and hyphens" +// PaginationParams represents normalized pagination input from query parameters. +type PaginationParams struct { + Limit int + Offset int +} + +// ListResponse is a generic list payload with pagination metadata. +type ListResponse struct { + Items interface{} `json:"items"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + HasNext bool `json:"hasNext,omitempty"` + HasPrev bool `json:"hasPrev,omitempty"` +} + +// ParsePagination reads and validates pagination query parameters from the request. +func ParsePagination(c *gin.Context) (PaginationParams, error) { + limit := defaultLimit + offset := 0 + + if rawLimit := strings.TrimSpace(c.Query("limit")); rawLimit != "" { + parsedLimit, err := strconv.Atoi(rawLimit) + if err != nil || parsedLimit < 1 { + return PaginationParams{}, &PaginationError{Message: "limit must be a positive integer"} + } + if parsedLimit > maxLimit { + parsedLimit = maxLimit + } + limit = parsedLimit + } + + if rawOffset := strings.TrimSpace(c.Query("offset")); rawOffset != "" { + parsedOffset, err := strconv.Atoi(rawOffset) + if err != nil || parsedOffset < 0 { + return PaginationParams{}, &PaginationError{Message: "offset must be a non-negative integer"} + } + offset = parsedOffset + } + + return PaginationParams{Limit: limit, Offset: offset}, nil +} + +// PaginationError is returned when pagination inputs are invalid. +type PaginationError struct { + Message string +} + +func (e *PaginationError) Error() string { + return e.Message +} + +// PaginationErrorResponse writes a standardized 400 response for pagination errors. +func PaginationErrorResponse(c *gin.Context, err error) { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) +} + // isValidSlug checks non-empty slug, length <= maxSlugLength (in runes), and allows Unicode letters/digits plus hyphen. func isValidSlug(s string) bool { if s == "" { @@ -80,29 +142,3 @@ func sqlNullJSONFromRaw(raw json.RawMessage) sql.NullString { } return sql.NullString{String: s, Valid: true} } - -func formatDBTimeToRFC3339(raw string) string { - s := strings.TrimSpace(raw) - if s == "" { - return "" - } - - // MySQL commonly returns "YYYY-MM-DD HH:MM:SS" or with fractional seconds. - // Some drivers/configs may return RFC3339. - layouts := []string{ - "2006-01-02 15:04:05", - "2006-01-02 15:04:05.999999", - "2006-01-02 15:04:05.999999999", - time.RFC3339Nano, - time.RFC3339, - } - - for _, layout := range layouts { - if t, err := time.Parse(layout, s); err == nil { - return t.Format(time.RFC3339) - } - } - - // Fallback: return original string instead of a wrong timestamp. - return s -} diff --git a/internal/api/handlers/data_collector.go b/internal/api/handlers/data_collector.go index fb3b4f9..06d285d 100644 --- a/internal/api/handlers/data_collector.go +++ b/internal/api/handlers/data_collector.go @@ -47,7 +47,12 @@ type DataCollectorResponse struct { // DataCollectorListResponse represents the response for listing data collectors. type DataCollectorListResponse struct { - DataCollectors []DataCollectorResponse `json:"data_collectors"` + Items []DataCollectorResponse `json:"items"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + HasNext bool `json:"hasNext,omitempty"` + HasPrev bool `json:"hasPrev,omitempty"` } // CreateDataCollectorRequest represents the request body for creating a data collector. @@ -91,8 +96,8 @@ type dataCollectorRow struct { Certification sql.NullString `db:"certification"` Status string `db:"status"` Metadata sql.NullString `db:"metadata"` - CreatedAt sql.NullString `db:"created_at"` - UpdatedAt sql.NullString `db:"updated_at"` + CreatedAt sql.NullTime `db:"created_at"` + UpdatedAt sql.NullTime `db:"updated_at"` } func dcMetadataFromDB(ns sql.NullString) interface{} { @@ -113,11 +118,11 @@ func dataCollectorResponseFromRow(dc dataCollectorRow) DataCollectorResponse { } createdAt := "" if dc.CreatedAt.Valid { - createdAt = dc.CreatedAt.String + createdAt = dc.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if dc.UpdatedAt.Valid { - updatedAt = dc.UpdatedAt.String + updatedAt = dc.UpdatedAt.Time.UTC().Format(time.RFC3339) } return DataCollectorResponse{ ID: fmt.Sprintf("%d", dc.ID), @@ -135,18 +140,42 @@ func dataCollectorResponseFromRow(dc dataCollectorRow) DataCollectorResponse { // ListDataCollectors handles data collector listing requests with filtering. // // @Summary List data collectors -// @Description Lists data collectors with optional filtering by status +// @Description Lists all data collectors with optional status filtering // @Tags data_collectors // @Accept json // @Produce json // @Param status query string false "Filter by status (active, inactive, on_leave)" +// @Param limit query int false "Max results (default 50, max 100)" +// @Param offset query int false "Pagination offset (default 0)" // @Success 200 {object} DataCollectorListResponse +// @Failure 400 {object} map[string]string // @Failure 500 {object} map[string]string // @Router /data_collectors [get] func (h *DataCollectorHandler) ListDataCollectors(c *gin.Context) { + pagination, err := ParsePagination(c) + if err != nil { + PaginationErrorResponse(c, err) + return + } + status := c.Query("status") - // Build query with optional filters + whereClause := "WHERE dc.deleted_at IS NULL" + args := []interface{}{} + + if status != "" { + whereClause += " AND dc.status = ?" + args = append(args, status) + } + + countQuery := "SELECT COUNT(*) FROM data_collectors dc " + whereClause + var total int + if err := h.db.Get(&total, countQuery, args...); err != nil { + logger.Printf("[DC] Failed to count data collectors: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list data collectors"}) + return + } + query := ` SELECT dc.id, @@ -159,20 +188,15 @@ func (h *DataCollectorHandler) ListDataCollectors(c *gin.Context) { dc.created_at, dc.updated_at FROM data_collectors dc - WHERE dc.deleted_at IS NULL + ` + whereClause + ` + ORDER BY dc.id DESC + LIMIT ? OFFSET ? ` - args := []interface{}{} - - if status != "" { - query += " AND dc.status = ?" - args = append(args, status) - } - query += " ORDER BY dc.id DESC" + queryArgs := append(args, pagination.Limit, pagination.Offset) - // Use db.Select for cleaner code and automatic resource management var dbRows []dataCollectorRow - if err := h.db.Select(&dbRows, query, args...); err != nil { + if err := h.db.Select(&dbRows, query, queryArgs...); err != nil { logger.Printf("[DC] Failed to query data collectors: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list data collectors"}) return @@ -183,8 +207,16 @@ func (h *DataCollectorHandler) ListDataCollectors(c *gin.Context) { dataCollectors = append(dataCollectors, dataCollectorResponseFromRow(dc)) } + hasNext := (pagination.Offset + pagination.Limit) < total + hasPrev := pagination.Offset > 0 + c.JSON(http.StatusOK, DataCollectorListResponse{ - DataCollectors: dataCollectors, + Items: dataCollectors, + Total: total, + Limit: pagination.Limit, + Offset: pagination.Offset, + HasNext: hasNext, + HasPrev: hasPrev, }) } @@ -236,8 +268,7 @@ func (h *DataCollectorHandler) CreateDataCollector(c *gin.Context) { return } - // Generate created_at timestamp in application layer - createdAt := time.Now().UTC().Format("2006-01-02 15:04:05") + now := time.Now().UTC() // Insert the data collector var emailStr sql.NullString @@ -292,8 +323,8 @@ func (h *DataCollectorHandler) CreateDataCollector(c *gin.Context) { certStr, "active", metadataStr, - createdAt, - createdAt, + now, + now, ) if err != nil { logger.Printf("[DC] Failed to insert data collector: %v", err) @@ -531,9 +562,8 @@ func (h *DataCollectorHandler) UpdateDataCollector(c *gin.Context) { return } - updatedAt := time.Now().UTC().Format("2006-01-02 15:04:05") updates = append(updates, "updated_at = ?") - args = append(args, updatedAt) + args = append(args, time.Now().UTC()) args = append(args, id) query := fmt.Sprintf("UPDATE data_collectors SET %s WHERE id = ? AND deleted_at IS NULL", strings.Join(updates, ", ")) @@ -624,10 +654,8 @@ func (h *DataCollectorHandler) DeleteDataCollector(c *gin.Context) { return } - updatedAt := time.Now().UTC().Format("2006-01-02 15:04:05") - // Perform soft delete by setting deleted_at - _, err = h.db.Exec("UPDATE data_collectors SET deleted_at = NOW(), updated_at = ? WHERE id = ? AND deleted_at IS NULL", updatedAt, id) + _, err = h.db.Exec("UPDATE data_collectors SET deleted_at = NOW(), updated_at = ? WHERE id = ? AND deleted_at IS NULL", time.Now().UTC(), id) if err != nil { logger.Printf("[DC] Failed to delete data collector: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete data collector"}) diff --git a/internal/api/handlers/episode.go b/internal/api/handlers/episode.go index cadff8a..8655590 100644 --- a/internal/api/handlers/episode.go +++ b/internal/api/handlers/episode.go @@ -31,10 +31,18 @@ func NewEpisodeHandler(db *sqlx.DB) *EpisodeHandler { } } -// Episode represents an episode in the database +// episodeRow represents an episode row from the database. type episodeRow struct { - ID string `db:"episode_id"` + ID int64 `db:"id"` + EpisodeID string `db:"episode_id"` TaskID int64 `db:"task_id"` + TaskPublicID sql.NullString `db:"task_public_id"` + SopSlug sql.NullString `db:"sop_slug"` + SopVersion sql.NullString `db:"sop_version"` + SceneName sql.NullString `db:"scene_name"` + SubsceneName sql.NullString `db:"subscene_name"` + RobotDeviceID sql.NullString `db:"robot_device_id"` + CollectorOperator sql.NullString `db:"collector_operator_id"` McapPath string `db:"mcap_path"` SidecarPath string `db:"sidecar_path"` Checksum sql.NullString `db:"checksum"` @@ -52,8 +60,16 @@ type episodeRow struct { // Episode represents an episode in the API response type Episode struct { - ID string `json:"id"` + ID int64 `json:"id"` + EpisodeID string `json:"episode_id,omitempty"` TaskID int64 `json:"task_id"` + TaskPublicID *string `json:"task_public_id,omitempty"` + SopSlug *string `json:"sop_slug"` + SopVersion *string `json:"sop_version"` + SceneName *string `json:"scene_name"` + SubsceneName *string `json:"subscene_name"` + RobotDeviceID *string `json:"robot_device_id"` + CollectorOperator *string `json:"collector_operator_id"` McapPath string `json:"mcap_path"` SidecarPath string `json:"sidecar_path"` Checksum *string `json:"checksum"` @@ -71,16 +87,18 @@ type Episode struct { // EpisodeListResponse represents the response for listing episodes type EpisodeListResponse struct { - Episodes []Episode `json:"episodes"` - Total int `json:"total"` - Limit int `json:"limit"` - Offset int `json:"offset"` + Items []Episode `json:"items"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + HasNext bool `json:"hasNext,omitempty"` + HasPrev bool `json:"hasPrev,omitempty"` } // RegisterRoutes registers episode-related routes func (h *EpisodeHandler) RegisterRoutes(apiV1 *gin.RouterGroup) { apiV1.GET("", h.ListEpisodes) - apiV1.GET(":id", h.GetEpisode) + apiV1.GET("/:id", h.GetEpisode) } func nullableString(value sql.NullString) *string { @@ -163,8 +181,14 @@ func (h *EpisodeHandler) ListEpisodes(c *gin.Context) { // Build the base query query := ` SELECT - e.episode_id, + e.id, e.task_id as task_id, + s.slug AS sop_slug, + s.version AS sop_version, + t.scene_name AS scene_name, + t.subscene_name AS subscene_name, + r.device_id AS robot_device_id, + dc.operator_id AS collector_operator_id, e.mcap_path, e.sidecar_path, COALESCE(e.qa_status, '') as qa_status, @@ -174,6 +198,11 @@ func (h *EpisodeHandler) ListEpisodes(c *gin.Context) { e.created_at, e.labels FROM episodes e + LEFT JOIN tasks t ON t.id = e.task_id AND t.deleted_at IS NULL + LEFT JOIN sops s ON s.id = t.sop_id AND s.deleted_at IS NULL + LEFT JOIN workstations ws ON ws.id = e.workstation_id AND ws.deleted_at IS NULL + LEFT JOIN robots r ON r.id = ws.robot_id AND r.deleted_at IS NULL + LEFT JOIN data_collectors dc ON dc.id = ws.data_collector_id AND dc.deleted_at IS NULL WHERE e.deleted_at IS NULL ` @@ -257,7 +286,15 @@ func (h *EpisodeHandler) ListEpisodes(c *gin.Context) { for i, r := range rows { episodes[i] = Episode{ ID: r.ID, + EpisodeID: r.EpisodeID, TaskID: r.TaskID, + TaskPublicID: nullableString(r.TaskPublicID), + SopSlug: nullableString(r.SopSlug), + SopVersion: nullableString(r.SopVersion), + SceneName: nullableString(r.SceneName), + SubsceneName: nullableString(r.SubsceneName), + RobotDeviceID: nullableString(r.RobotDeviceID), + CollectorOperator: nullableString(r.CollectorOperator), McapPath: r.McapPath, SidecarPath: r.SidecarPath, Checksum: nullableString(r.Checksum), @@ -274,34 +311,46 @@ func (h *EpisodeHandler) ListEpisodes(c *gin.Context) { } } + hasNext := (offset + limit) < total + hasPrev := offset > 0 + // Return response c.JSON(http.StatusOK, EpisodeListResponse{ - Episodes: episodes, - Total: total, - Limit: limit, - Offset: offset, + Items: episodes, + Total: total, + Limit: limit, + Offset: offset, + HasNext: hasNext, + HasPrev: hasPrev, }) } -// GetEpisode returns episode details by episode ID. +// GetEpisode returns episode details by episode numeric ID. // // @Summary Get episode details // @Description Returns an episode by ID // @Tags episodes // @Produce json -// @Param id path string true "Episode ID" +// @Param id path int true "Episode ID" // @Success 200 {object} Episode // @Failure 404 {object} map[string]string // @Failure 500 {object} map[string]string // @Router /episodes/{id} [get] func (h *EpisodeHandler) GetEpisode(c *gin.Context) { - episodeID := c.Param("id") + episodeIDStr := c.Param("id") + episodeID, err := strconv.ParseInt(strings.TrimSpace(episodeIDStr), 10, 64) + if err != nil || episodeID <= 0 { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid episode id"}) + return + } var row episodeRow query := ` SELECT + e.id, e.episode_id, e.task_id AS task_id, + t.task_id AS task_public_id, e.mcap_path, e.sidecar_path, e.checksum, @@ -316,13 +365,14 @@ func (h *EpisodeHandler) GetEpisode(c *gin.Context) { e.created_at, e.labels FROM episodes e + LEFT JOIN tasks t ON t.id = e.task_id AND t.deleted_at IS NULL LEFT JOIN inspections i ON i.episode_id = e.id LEFT JOIN inspectors ins ON ins.id = i.inspector_id - WHERE e.episode_id = ? AND e.deleted_at IS NULL + WHERE e.id = ? AND e.deleted_at IS NULL LIMIT 1 ` - err := h.db.Get(&row, query, episodeID) + err = h.db.Get(&row, query, episodeID) if err == sql.ErrNoRows { c.JSON(http.StatusNotFound, gin.H{"error": "episode not found"}) return @@ -336,7 +386,15 @@ func (h *EpisodeHandler) GetEpisode(c *gin.Context) { c.JSON(http.StatusOK, Episode{ ID: row.ID, + EpisodeID: row.EpisodeID, TaskID: row.TaskID, + TaskPublicID: nullableString(row.TaskPublicID), + SopSlug: nullableString(row.SopSlug), + SopVersion: nullableString(row.SopVersion), + SceneName: nullableString(row.SceneName), + SubsceneName: nullableString(row.SubsceneName), + RobotDeviceID: nullableString(row.RobotDeviceID), + CollectorOperator: nullableString(row.CollectorOperator), McapPath: row.McapPath, SidecarPath: row.SidecarPath, Checksum: nullableString(row.Checksum), diff --git a/internal/api/handlers/factory.go b/internal/api/handlers/factory.go index 3c8d147..c81daba 100644 --- a/internal/api/handlers/factory.go +++ b/internal/api/handlers/factory.go @@ -88,8 +88,8 @@ type factoryRow struct { Timezone sql.NullString `db:"timezone"` Settings sql.NullString `db:"settings"` SceneCount int `db:"scene_count"` - CreatedAt sql.NullString `db:"created_at"` - UpdatedAt sql.NullString `db:"updated_at"` + CreatedAt sql.NullTime `db:"created_at"` + UpdatedAt sql.NullTime `db:"updated_at"` } func factorySettingsFromDB(ns sql.NullString) interface{} { @@ -99,7 +99,7 @@ func factorySettingsFromDB(ns sql.NullString) interface{} { return json.RawMessage(ns.String) } -// ListFactories handles factory listing requests with filtering. +// ListFactories handles factory listing requests with filtering and pagination. // // @Summary List factories // @Description Lists factories with optional filtering by organization_id @@ -107,12 +107,42 @@ func factorySettingsFromDB(ns sql.NullString) interface{} { // @Accept json // @Produce json // @Param organization_id query string false "Filter by organization ID" -// @Success 200 {object} FactoryListResponse +// @Param limit query int false "Max results (default 50, max 100)" +// @Param offset query int false "Pagination offset (default 0)" +// @Success 200 {object} ListResponse +// @Failure 400 {object} map[string]string // @Failure 500 {object} map[string]string // @Router /factories [get] func (h *FactoryHandler) ListFactories(c *gin.Context) { + pagination, err := ParsePagination(c) + if err != nil { + PaginationErrorResponse(c, err) + return + } + orgID := c.Query("organization_id") + whereClause := "WHERE deleted_at IS NULL" + args := []interface{}{} + + if orgID != "" { + parsedOrgID, err := strconv.ParseInt(orgID, 10, 64) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid organization_id format"}) + return + } + whereClause += " AND organization_id = ?" + args = append(args, parsedOrgID) + } + + countQuery := "SELECT COUNT(*) FROM factories " + whereClause + var total int + if err := h.db.Get(&total, countQuery, args...); err != nil { + logger.Printf("[FACTORY] Failed to count factories: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list factories"}) + return + } + query := ` SELECT id, @@ -126,27 +156,17 @@ func (h *FactoryHandler) ListFactories(c *gin.Context) { created_at, updated_at FROM factories - WHERE deleted_at IS NULL + ` + whereClause + ` + ORDER BY id DESC + LIMIT ? OFFSET ? ` - args := []interface{}{} - if orgID != "" { - parsedOrgID, err := strconv.ParseInt(orgID, 10, 64) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "invalid organization_id format"}) - return - } - query += " AND organization_id = ?" - args = append(args, parsedOrgID) - } - - query += " ORDER BY id DESC" + queryArgs := append(args, pagination.Limit, pagination.Offset) factories := []FactoryResponse{} - // Use db.Select for cleaner code and automatic resource management var dbRows []factoryRow - if err := h.db.Select(&dbRows, query, args...); err != nil { + if err := h.db.Select(&dbRows, query, queryArgs...); err != nil { logger.Printf("[FACTORY] Failed to query factories: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list factories"}) return @@ -163,7 +183,7 @@ func (h *FactoryHandler) ListFactories(c *gin.Context) { } createdAt := "" if f.CreatedAt.Valid { - createdAt = f.CreatedAt.String + createdAt = f.CreatedAt.Time.UTC().Format(time.RFC3339) } factories = append(factories, FactoryResponse{ @@ -179,8 +199,16 @@ func (h *FactoryHandler) ListFactories(c *gin.Context) { }) } - c.JSON(http.StatusOK, FactoryListResponse{ - Factories: factories, + hasNext := (pagination.Offset + pagination.Limit) < total + hasPrev := pagination.Offset > 0 + + c.JSON(http.StatusOK, ListResponse{ + Items: factories, + Total: total, + Limit: pagination.Limit, + Offset: pagination.Offset, + HasNext: hasNext, + HasPrev: hasPrev, }) } @@ -376,11 +404,11 @@ func (h *FactoryHandler) GetFactory(c *gin.Context) { } createdAt := "" if f.CreatedAt.Valid { - createdAt = f.CreatedAt.String + createdAt = f.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if f.UpdatedAt.Valid { - updatedAt = f.UpdatedAt.String + updatedAt = f.UpdatedAt.Time.UTC().Format(time.RFC3339) } c.JSON(http.StatusOK, FactoryResponse{ @@ -580,11 +608,11 @@ func (h *FactoryHandler) UpdateFactory(c *gin.Context) { } createdAt := "" if f.CreatedAt.Valid { - createdAt = f.CreatedAt.String + createdAt = f.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if f.UpdatedAt.Valid { - updatedAt = f.UpdatedAt.String + updatedAt = f.UpdatedAt.Time.UTC().Format(time.RFC3339) } c.JSON(http.StatusOK, FactoryResponse{ diff --git a/internal/api/handlers/inspector.go b/internal/api/handlers/inspector.go index a70359a..1ac6f05 100644 --- a/internal/api/handlers/inspector.go +++ b/internal/api/handlers/inspector.go @@ -45,7 +45,12 @@ type InspectorResponse struct { // InspectorListResponse represents the response for listing inspectors. type InspectorListResponse struct { - Inspectors []InspectorResponse `json:"inspectors"` + Items []InspectorResponse `json:"items"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + HasNext bool `json:"hasNext,omitempty"` + HasPrev bool `json:"hasPrev,omitempty"` } // CreateInspectorRequest represents the request body for creating an inspector. @@ -97,24 +102,49 @@ type inspectorRow struct { CertificationLevel string `db:"certification_level"` Status string `db:"status"` Metadata sql.NullString `db:"metadata"` - CreatedAt sql.NullString `db:"created_at"` - UpdatedAt sql.NullString `db:"updated_at"` + CreatedAt sql.NullTime `db:"created_at"` + UpdatedAt sql.NullTime `db:"updated_at"` } // ListInspectors handles inspector listing requests. // // @Summary List inspectors -// @Description Lists all inspectors +// @Description Lists all inspectors with pagination // @Tags inspectors // @Accept json // @Produce json // @Param status query string false "Filter by status (active, inactive)" +// @Param limit query int false "Max results (default 50, max 100)" +// @Param offset query int false "Pagination offset (default 0)" // @Success 200 {object} InspectorListResponse +// @Failure 400 {object} map[string]string // @Failure 500 {object} map[string]string // @Router /inspectors [get] func (h *InspectorHandler) ListInspectors(c *gin.Context) { + pagination, err := ParsePagination(c) + if err != nil { + PaginationErrorResponse(c, err) + return + } + status := c.Query("status") + whereClause := "WHERE deleted_at IS NULL" + args := []interface{}{} + + if status != "" { + whereClause += " AND status = ?" + args = append(args, status) + } + + countQuery := "SELECT COUNT(*) FROM inspectors " + whereClause + var total int + if err := h.db.Get(&total, countQuery, args...); err != nil { + logger.Printf("[INSPECTOR] Failed to count inspectors: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list inspectors"}) + return + } + query := ` SELECT id, @@ -127,19 +157,15 @@ func (h *InspectorHandler) ListInspectors(c *gin.Context) { created_at, updated_at FROM inspectors - WHERE deleted_at IS NULL + ` + whereClause + ` + ORDER BY id DESC + LIMIT ? OFFSET ? ` - args := []interface{}{} - if status != "" { - query += " AND status = ?" - args = append(args, status) - } - - query += " ORDER BY id DESC" + queryArgs := append(args, pagination.Limit, pagination.Offset) var dbRows []inspectorRow - if err := h.db.Select(&dbRows, query, args...); err != nil { + if err := h.db.Select(&dbRows, query, queryArgs...); err != nil { logger.Printf("[INSPECTOR] Failed to query inspectors: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list inspectors"}) return @@ -161,11 +187,11 @@ func (h *InspectorHandler) ListInspectors(c *gin.Context) { } createdAt := "" if i.CreatedAt.Valid { - createdAt = i.CreatedAt.String + createdAt = i.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if i.UpdatedAt.Valid { - updatedAt = i.UpdatedAt.String + updatedAt = i.UpdatedAt.Time.UTC().Format(time.RFC3339) } inspectors = append(inspectors, InspectorResponse{ @@ -181,8 +207,16 @@ func (h *InspectorHandler) ListInspectors(c *gin.Context) { }) } + hasNext := (pagination.Offset + pagination.Limit) < total + hasPrev := pagination.Offset > 0 + c.JSON(http.StatusOK, InspectorListResponse{ - Inspectors: inspectors, + Items: inspectors, + Total: total, + Limit: pagination.Limit, + Offset: pagination.Offset, + HasNext: hasNext, + HasPrev: hasPrev, }) } @@ -247,11 +281,11 @@ func (h *InspectorHandler) GetInspector(c *gin.Context) { } createdAt := "" if i.CreatedAt.Valid { - createdAt = i.CreatedAt.String + createdAt = i.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if i.UpdatedAt.Valid { - updatedAt = i.UpdatedAt.String + updatedAt = i.UpdatedAt.Time.UTC().Format(time.RFC3339) } c.JSON(http.StatusOK, InspectorResponse{ @@ -555,11 +589,11 @@ func (h *InspectorHandler) UpdateInspector(c *gin.Context) { } createdAt := "" if i.CreatedAt.Valid { - createdAt = i.CreatedAt.String + createdAt = i.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if i.UpdatedAt.Valid { - updatedAt = i.UpdatedAt.String + updatedAt = i.UpdatedAt.Time.UTC().Format(time.RFC3339) } c.JSON(http.StatusOK, InspectorResponse{ diff --git a/internal/api/handlers/order.go b/internal/api/handlers/order.go index 2c71349..837bf06 100644 --- a/internal/api/handlers/order.go +++ b/internal/api/handlers/order.go @@ -44,13 +44,35 @@ func (h *OrderHandler) RegisterRoutes(apiV1 *gin.RouterGroup) { // OrderListResponse is the response body for listing orders. type OrderListResponse struct { - Orders []OrderResponse `json:"orders"` + Items []OrderListItemResponse `json:"items"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + HasNext bool `json:"hasNext,omitempty"` + HasPrev bool `json:"hasPrev,omitempty"` +} + +// OrderListItemResponse is the response body for an order item in list views. +// It intentionally excludes task statistics to keep ListOrders fast. +type OrderListItemResponse struct { + ID string `json:"id"` + SceneID string `json:"scene_id"` + SceneName string `json:"scene_name,omitempty"` + Name string `json:"name"` + TargetCount int `json:"target_count"` + Status string `json:"status"` + Priority string `json:"priority"` + Deadline string `json:"deadline,omitempty"` + Metadata any `json:"metadata,omitempty"` + CreatedAt string `json:"created_at,omitempty"` + UpdatedAt string `json:"updated_at,omitempty"` } // OrderResponse is the response body for a single order. type OrderResponse struct { ID string `json:"id"` SceneID string `json:"scene_id"` + SceneName string `json:"scene_name,omitempty"` Name string `json:"name"` TargetCount int `json:"target_count"` TaskCount int `json:"task_count"` @@ -91,6 +113,7 @@ type UpdateOrderRequest struct { type orderRow struct { ID int64 `db:"id"` SceneID int64 `db:"scene_id"` + SceneName sql.NullString `db:"scene_name"` Name string `db:"name"` TargetCount int `db:"target_count"` TaskCount int `db:"task_count"` @@ -101,8 +124,22 @@ type orderRow struct { Priority string `db:"priority"` Deadline sql.NullTime `db:"deadline"` Metadata sql.NullString `db:"metadata"` - CreatedAt sql.NullString `db:"created_at"` - UpdatedAt sql.NullString `db:"updated_at"` + CreatedAt sql.NullTime `db:"created_at"` + UpdatedAt sql.NullTime `db:"updated_at"` +} + +type orderListRow struct { + ID int64 `db:"id"` + SceneID int64 `db:"scene_id"` + SceneName sql.NullString `db:"scene_name"` + Name string `db:"name"` + TargetCount int `db:"target_count"` + Status string `db:"status"` + Priority string `db:"priority"` + Deadline sql.NullTime `db:"deadline"` + Metadata sql.NullString `db:"metadata"` + CreatedAt sql.NullTime `db:"created_at"` + UpdatedAt sql.NullTime `db:"updated_at"` } var validOrderPriorities = map[string]struct{}{ @@ -120,12 +157,40 @@ var validOrderStatuses = map[string]struct{}{ "cancelled": {}, } -// ListOrders returns all non-deleted orders. +// ListOrders returns all non-deleted orders with pagination. func (h *OrderHandler) ListOrders(c *gin.Context) { + pagination, err := ParsePagination(c) + if err != nil { + PaginationErrorResponse(c, err) + return + } + + status := strings.TrimSpace(c.Query("status")) + if status != "" { + if _, ok := validOrderStatuses[status]; !ok { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid status"}) + return + } + } + + countQuery := "SELECT COUNT(*) FROM orders WHERE deleted_at IS NULL" + countArgs := []any{} + if status != "" { + countQuery += " AND status = ?" + countArgs = append(countArgs, status) + } + var total int + if err := h.db.Get(&total, countQuery, countArgs...); err != nil { + logger.Printf("[ORDER] Failed to count orders: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list orders"}) + return + } + query := ` SELECT o.id, o.scene_id, + s.name AS scene_name, o.name, o.target_count, o.status, @@ -133,32 +198,38 @@ func (h *OrderHandler) ListOrders(c *gin.Context) { o.deadline, CAST(o.metadata AS CHAR) AS metadata, o.created_at, - o.updated_at, - (SELECT COUNT(*) FROM tasks t WHERE t.order_id = o.id AND t.deleted_at IS NULL) AS task_count, - (SELECT COUNT(*) FROM tasks t WHERE t.order_id = o.id AND t.status = 'completed' AND t.deleted_at IS NULL) AS completed_count, - (SELECT COUNT(*) FROM tasks t WHERE t.order_id = o.id AND t.status = 'cancelled' AND t.deleted_at IS NULL) AS cancelled_count, - (SELECT COUNT(*) FROM tasks t WHERE t.order_id = o.id AND t.status = 'failed' AND t.deleted_at IS NULL) AS failed_count + o.updated_at FROM orders o + LEFT JOIN scenes s ON s.id = o.scene_id AND s.deleted_at IS NULL WHERE o.deleted_at IS NULL + ` + args := []any{} + if status != "" { + query += " AND o.status = ?\n" + args = append(args, status) + } + query += ` ORDER BY o.id DESC + LIMIT ? OFFSET ? ` + args = append(args, pagination.Limit, pagination.Offset) - var rows []orderRow - if err := h.db.Select(&rows, query); err != nil { + var rows []orderListRow + if err := h.db.Select(&rows, query, args...); err != nil { logger.Printf("[ORDER] Failed to query orders: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list orders"}) return } - orders := make([]OrderResponse, 0, len(rows)) + orders := make([]OrderListItemResponse, 0, len(rows)) for _, r := range rows { createdAt := "" if r.CreatedAt.Valid { - createdAt = r.CreatedAt.String + createdAt = r.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if r.UpdatedAt.Valid { - updatedAt = r.UpdatedAt.String + updatedAt = r.UpdatedAt.Time.UTC().Format(time.RFC3339) } deadline := "" if r.Deadline.Valid { @@ -171,25 +242,36 @@ func (h *OrderHandler) ListOrders(c *gin.Context) { metadata = v } } - orders = append(orders, OrderResponse{ - ID: fmt.Sprintf("%d", r.ID), - SceneID: fmt.Sprintf("%d", r.SceneID), - Name: r.Name, - TargetCount: r.TargetCount, - TaskCount: r.TaskCount, - CompletedCount: r.CompletedCount, - CancelledCount: r.CancelledCount, - FailedCount: r.FailedCount, - Status: r.Status, - Priority: r.Priority, - Deadline: deadline, - Metadata: metadata, - CreatedAt: createdAt, - UpdatedAt: updatedAt, + sceneName := "" + if r.SceneName.Valid { + sceneName = r.SceneName.String + } + orders = append(orders, OrderListItemResponse{ + ID: fmt.Sprintf("%d", r.ID), + SceneID: fmt.Sprintf("%d", r.SceneID), + SceneName: sceneName, + Name: r.Name, + TargetCount: r.TargetCount, + Status: r.Status, + Priority: r.Priority, + Deadline: deadline, + Metadata: metadata, + CreatedAt: createdAt, + UpdatedAt: updatedAt, }) } - c.JSON(http.StatusOK, OrderListResponse{Orders: orders}) + hasNext := (pagination.Offset + pagination.Limit) < total + hasPrev := pagination.Offset > 0 + + c.JSON(http.StatusOK, OrderListResponse{ + Items: orders, + Total: total, + Limit: pagination.Limit, + Offset: pagination.Offset, + HasNext: hasNext, + HasPrev: hasPrev, + }) } // GetOrder returns a single order by id. @@ -205,6 +287,7 @@ func (h *OrderHandler) GetOrder(c *gin.Context) { SELECT o.id, o.scene_id, + s.name AS scene_name, o.name, o.target_count, o.status, @@ -213,12 +296,26 @@ func (h *OrderHandler) GetOrder(c *gin.Context) { CAST(o.metadata AS CHAR) AS metadata, o.created_at, o.updated_at, - (SELECT COUNT(*) FROM tasks t WHERE t.order_id = o.id AND t.deleted_at IS NULL) AS task_count, - (SELECT COUNT(*) FROM tasks t WHERE t.order_id = o.id AND t.status = 'completed' AND t.deleted_at IS NULL) AS completed_count, - (SELECT COUNT(*) FROM tasks t WHERE t.order_id = o.id AND t.status = 'cancelled' AND t.deleted_at IS NULL) AS cancelled_count, - (SELECT COUNT(*) FROM tasks t WHERE t.order_id = o.id AND t.status = 'failed' AND t.deleted_at IS NULL) AS failed_count + COUNT(t.id) AS task_count, + SUM(CASE WHEN t.status = 'completed' THEN 1 ELSE 0 END) AS completed_count, + SUM(CASE WHEN t.status = 'cancelled' THEN 1 ELSE 0 END) AS cancelled_count, + SUM(CASE WHEN t.status = 'failed' THEN 1 ELSE 0 END) AS failed_count FROM orders o + LEFT JOIN scenes s ON s.id = o.scene_id AND s.deleted_at IS NULL + LEFT JOIN tasks t ON t.order_id = o.id AND t.deleted_at IS NULL WHERE o.id = ? AND o.deleted_at IS NULL + GROUP BY + o.id, + o.scene_id, + s.name, + o.name, + o.target_count, + o.status, + o.priority, + o.deadline, + o.metadata, + o.created_at, + o.updated_at ` var r orderRow @@ -234,11 +331,11 @@ func (h *OrderHandler) GetOrder(c *gin.Context) { createdAt := "" if r.CreatedAt.Valid { - createdAt = r.CreatedAt.String + createdAt = r.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if r.UpdatedAt.Valid { - updatedAt = r.UpdatedAt.String + updatedAt = r.UpdatedAt.Time.UTC().Format(time.RFC3339) } deadline := "" if r.Deadline.Valid { @@ -251,10 +348,15 @@ func (h *OrderHandler) GetOrder(c *gin.Context) { metadata = v } } + sceneName := "" + if r.SceneName.Valid { + sceneName = r.SceneName.String + } c.JSON(http.StatusOK, OrderResponse{ ID: fmt.Sprintf("%d", r.ID), SceneID: fmt.Sprintf("%d", r.SceneID), + SceneName: sceneName, Name: r.Name, TargetCount: r.TargetCount, TaskCount: r.TaskCount, @@ -338,12 +440,13 @@ func (h *OrderHandler) CreateOrder(c *gin.Context) { // Derive organization_id from scene -> factory -> organization var organizationID int64 - err = h.db.Get(&organizationID, ` - SELECT f.organization_id + var sceneName string + err = h.db.QueryRowx(` + SELECT f.organization_id, s.name FROM scenes s JOIN factories f ON f.id = s.factory_id WHERE s.id = ? AND s.deleted_at IS NULL AND f.deleted_at IS NULL - `, sceneID) + `, sceneID).Scan(&organizationID, &sceneName) if err != nil { if err == sql.ErrNoRows { c.JSON(http.StatusBadRequest, gin.H{"error": "scene not found"}) @@ -407,6 +510,7 @@ func (h *OrderHandler) CreateOrder(c *gin.Context) { c.JSON(http.StatusCreated, OrderResponse{ ID: fmt.Sprintf("%d", id), SceneID: fmt.Sprintf("%d", sceneID), + SceneName: sceneName, Name: req.Name, TargetCount: req.TargetCount, TaskCount: 0, @@ -453,28 +557,9 @@ func (h *OrderHandler) UpdateOrder(c *gin.Context) { args := []interface{}{} if req.SceneID != nil { - sceneIDStr := strings.TrimSpace(*req.SceneID) - if sceneIDStr == "" { - c.JSON(http.StatusBadRequest, gin.H{"error": "scene_id cannot be empty"}) - return - } - sceneID, err := strconv.ParseInt(sceneIDStr, 10, 64) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "invalid scene_id format"}) - return - } - var sceneExists bool - if err := h.db.Get(&sceneExists, "SELECT EXISTS(SELECT 1 FROM scenes WHERE id = ? AND deleted_at IS NULL)", sceneID); err != nil { - logger.Printf("[ORDER] Failed to verify scene: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update order"}) - return - } - if !sceneExists { - c.JSON(http.StatusBadRequest, gin.H{"error": "scene not found"}) - return - } - updates = append(updates, "scene_id = ?") - args = append(args, sceneID) + // scene_id is immutable after creation: changing it would invalidate existing batches/tasks/subscenes. + c.JSON(http.StatusBadRequest, gin.H{"error": "scene_id is immutable and cannot be updated"}) + return } var autoStatusFromTarget *string diff --git a/internal/api/handlers/organization.go b/internal/api/handlers/organization.go index 1a74ffd..22cfbdf 100644 --- a/internal/api/handlers/organization.go +++ b/internal/api/handlers/organization.go @@ -43,7 +43,12 @@ type OrganizationResponse struct { // OrganizationListResponse represents the response for listing organizations. type OrganizationListResponse struct { - Organizations []OrganizationResponse `json:"organizations"` + Items []OrganizationResponse `json:"items"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + HasNext bool `json:"hasNext,omitempty"` + HasPrev bool `json:"hasPrev,omitempty"` } // CreateOrganizationRequest represents the request body for creating an organization. @@ -105,22 +110,39 @@ type organizationRow struct { Slug string `db:"slug"` Description sql.NullString `db:"description"` Settings sql.NullString `db:"settings"` - CreatedAt sql.NullString `db:"created_at"` - UpdatedAt sql.NullString `db:"updated_at"` + CreatedAt sql.NullTime `db:"created_at"` + UpdatedAt sql.NullTime `db:"updated_at"` FactoryCount int `db:"factory_count"` } // ListOrganizations handles organization listing requests. // // @Summary List organizations -// @Description Lists all organizations +// @Description Lists all organizations with pagination // @Tags organizations // @Accept json // @Produce json +// @Param limit query int false "Max results (default 50, max 100)" +// @Param offset query int false "Pagination offset (default 0)" // @Success 200 {object} OrganizationListResponse +// @Failure 400 {object} map[string]string // @Failure 500 {object} map[string]string // @Router /organizations [get] func (h *OrganizationHandler) ListOrganizations(c *gin.Context) { + pagination, err := ParsePagination(c) + if err != nil { + PaginationErrorResponse(c, err) + return + } + + countQuery := "SELECT COUNT(*) FROM organizations WHERE deleted_at IS NULL" + var total int + if err := h.db.Get(&total, countQuery); err != nil { + logger.Printf("[ORGANIZATION] Failed to count organizations: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list organizations"}) + return + } + query := ` SELECT o.id, @@ -135,10 +157,11 @@ func (h *OrganizationHandler) ListOrganizations(c *gin.Context) { FROM organizations o WHERE o.deleted_at IS NULL ORDER BY o.id DESC + LIMIT ? OFFSET ? ` var dbRows []organizationRow - if err := h.db.Select(&dbRows, query); err != nil { + if err := h.db.Select(&dbRows, query, pagination.Limit, pagination.Offset); err != nil { logger.Printf("[ORGANIZATION] Failed to query organizations: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list organizations"}) return @@ -158,12 +181,12 @@ func (h *OrganizationHandler) ListOrganizations(c *gin.Context) { createdAt := "" if org.CreatedAt.Valid { - createdAt = org.CreatedAt.String + createdAt = org.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if org.UpdatedAt.Valid { - updatedAt = org.UpdatedAt.String + updatedAt = org.UpdatedAt.Time.UTC().Format(time.RFC3339) } organizations = append(organizations, OrganizationResponse{ @@ -178,8 +201,16 @@ func (h *OrganizationHandler) ListOrganizations(c *gin.Context) { }) } + hasNext := (pagination.Offset + pagination.Limit) < total + hasPrev := pagination.Offset > 0 + c.JSON(http.StatusOK, OrganizationListResponse{ - Organizations: organizations, + Items: organizations, + Total: total, + Limit: pagination.Limit, + Offset: pagination.Offset, + HasNext: hasNext, + HasPrev: hasPrev, }) } @@ -242,12 +273,12 @@ func (h *OrganizationHandler) GetOrganization(c *gin.Context) { createdAt := "" if org.CreatedAt.Valid { - createdAt = org.CreatedAt.String + createdAt = org.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if org.UpdatedAt.Valid { - updatedAt = org.UpdatedAt.String + updatedAt = org.UpdatedAt.Time.UTC().Format(time.RFC3339) } c.JSON(http.StatusOK, OrganizationResponse{ @@ -509,12 +540,12 @@ func (h *OrganizationHandler) UpdateOrganization(c *gin.Context) { createdAt := "" if org.CreatedAt.Valid { - createdAt = org.CreatedAt.String + createdAt = org.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if org.UpdatedAt.Valid { - updatedAt = org.UpdatedAt.String + updatedAt = org.UpdatedAt.Time.UTC().Format(time.RFC3339) } c.JSON(http.StatusOK, OrganizationResponse{ diff --git a/internal/api/handlers/robot.go b/internal/api/handlers/robot.go index 195d82c..edd8791 100644 --- a/internal/api/handlers/robot.go +++ b/internal/api/handlers/robot.go @@ -54,7 +54,12 @@ type RobotResponse struct { // RobotListResponse represents the response for listing robots. type RobotListResponse struct { - Robots []RobotResponse `json:"robots"` + Items []RobotResponse `json:"items"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + HasNext bool `json:"hasNext,omitempty"` + HasPrev bool `json:"hasPrev,omitempty"` } // DeviceConnectionResponse is an in-memory connection snapshot keyed by Axon device_id (no database access). @@ -107,8 +112,8 @@ type robotRow struct { AssetID sql.NullString `db:"asset_id"` Status string `db:"status"` Metadata sql.NullString `db:"metadata"` - CreatedAt sql.NullString `db:"created_at"` - UpdatedAt sql.NullString `db:"updated_at"` + CreatedAt sql.NullTime `db:"created_at"` + UpdatedAt sql.NullTime `db:"updated_at"` } func robotMetadataFromDB(ns sql.NullString) interface{} { @@ -150,11 +155,11 @@ func (h *RobotHandler) connectionStateDetailed(deviceID string) (connected bool, func (h *RobotHandler) responseFromRow(r robotRow) RobotResponse { createdAt := "" if r.CreatedAt.Valid { - createdAt = r.CreatedAt.String + createdAt = r.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if r.UpdatedAt.Valid { - updatedAt = r.UpdatedAt.String + updatedAt = r.UpdatedAt.Time.UTC().Format(time.RFC3339) } assetID := "" if r.AssetID.Valid { @@ -187,10 +192,19 @@ func (h *RobotHandler) responseFromRow(r robotRow) RobotResponse { // @Param status query string false "Filter by status (active, maintenance, retired)" // @Param robot_type_id query string false "Filter by robot type id" // @Param connected query string false "Filter by connection status (true/false)" +// @Param limit query int false "Max results (default 50, max 100)" +// @Param offset query int false "Pagination offset (default 0)" // @Success 200 {object} RobotListResponse +// @Failure 400 {object} map[string]string // @Failure 500 {object} map[string]string // @Router /robots [get] func (h *RobotHandler) ListRobots(c *gin.Context) { + pagination, err := ParsePagination(c) + if err != nil { + PaginationErrorResponse(c, err) + return + } + factoryID := c.Query("factory_id") status := c.Query("status") robotTypeID := c.Query("robot_type_id") @@ -206,21 +220,7 @@ func (h *RobotHandler) ListRobots(c *gin.Context) { connectedFilter = &connected } - // Build query with optional filters - query := ` - SELECT - r.id, - r.robot_type_id, - r.device_id, - r.factory_id, - r.asset_id, - r.status, - r.metadata, - r.created_at, - r.updated_at - FROM robots r - WHERE r.deleted_at IS NULL - ` + whereClause := "WHERE r.deleted_at IS NULL" args := []interface{}{} if factoryID != "" { @@ -229,31 +229,54 @@ func (h *RobotHandler) ListRobots(c *gin.Context) { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid factory_id format"}) return } - query += " AND r.factory_id = ?" + whereClause += " AND r.factory_id = ?" args = append(args, parsedFactoryID) } if status != "" { - query += " AND r.status = ?" + whereClause += " AND r.status = ?" args = append(args, status) } if robotTypeID != "" { - // Parse robot_type_id as numeric value parsedRobotTypeID, err := strconv.ParseInt(robotTypeID, 10, 64) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "invalid robot_type_id format"}) return } - query += " AND r.robot_type_id = ?" + whereClause += " AND r.robot_type_id = ?" args = append(args, parsedRobotTypeID) } - query += " ORDER BY r.id DESC" + countQuery := "SELECT COUNT(*) FROM robots r " + whereClause + var total int + if err := h.db.Get(&total, countQuery, args...); err != nil { + logger.Printf("[ROBOT] Failed to count robots: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list robots"}) + return + } + + query := ` + SELECT + r.id, + r.robot_type_id, + r.device_id, + r.factory_id, + r.asset_id, + r.status, + r.metadata, + r.created_at, + r.updated_at + FROM robots r + ` + whereClause + ` + ORDER BY r.id DESC + LIMIT ? OFFSET ? + ` + + queryArgs := append(args, pagination.Limit, pagination.Offset) - // Use db.Select for cleaner code and automatic resource management var dbRows []robotRow - if err := h.db.Select(&dbRows, query, args...); err != nil { + if err := h.db.Select(&dbRows, query, queryArgs...); err != nil { logger.Printf("[ROBOT] Failed to query robots: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list robots"}) return @@ -268,8 +291,16 @@ func (h *RobotHandler) ListRobots(c *gin.Context) { robots = append(robots, resp) } + hasNext := (pagination.Offset + pagination.Limit) < total + hasPrev := pagination.Offset > 0 + c.JSON(http.StatusOK, RobotListResponse{ - Robots: robots, + Items: robots, + Total: total, + Limit: pagination.Limit, + Offset: pagination.Offset, + HasNext: hasNext, + HasPrev: hasPrev, }) } @@ -340,8 +371,7 @@ func (h *RobotHandler) CreateRobot(c *gin.Context) { return } - // Generate created_at timestamp in application layer - createdAt := time.Now().UTC().Format("2006-01-02 15:04:05") + now := time.Now().UTC() var assetIDStr sql.NullString if a := strings.TrimSpace(req.AssetID); a != "" { @@ -376,8 +406,8 @@ func (h *RobotHandler) CreateRobot(c *gin.Context) { assetIDStr, "active", metadataStr, - createdAt, - createdAt, + now, + now, ) if err != nil { logger.Printf("[ROBOT] Failed to insert robot: %v", err) @@ -657,7 +687,7 @@ func (h *RobotHandler) UpdateRobot(c *gin.Context) { } updates = append(updates, "updated_at = ?") - args = append(args, time.Now().UTC().Format("2006-01-02 15:04:05")) + args = append(args, time.Now().UTC()) args = append(args, id) query := fmt.Sprintf("UPDATE robots SET %s WHERE id = ? AND deleted_at IS NULL", strings.Join(updates, ", ")) @@ -741,10 +771,8 @@ func (h *RobotHandler) DeleteRobot(c *gin.Context) { return } - updatedAt := time.Now().UTC().Format("2006-01-02 15:04:05") - // Perform soft delete by setting deleted_at - _, err = h.db.Exec("UPDATE robots SET deleted_at = NOW(), updated_at = ? WHERE id = ? AND deleted_at IS NULL", updatedAt, id) + _, err = h.db.Exec("UPDATE robots SET deleted_at = NOW(), updated_at = ? WHERE id = ? AND deleted_at IS NULL", time.Now().UTC(), id) if err != nil { logger.Printf("[ROBOT] Failed to delete robot: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to delete robot"}) diff --git a/internal/api/handlers/robot_type.go b/internal/api/handlers/robot_type.go index 5ad2b99..4e4807f 100644 --- a/internal/api/handlers/robot_type.go +++ b/internal/api/handlers/robot_type.go @@ -56,7 +56,12 @@ type RobotTypeResponse struct { // RobotTypeListResponse represents the response for listing robot types. type RobotTypeListResponse struct { - RobotTypes []RobotTypeResponse `json:"robot_types"` + Items []RobotTypeResponse `json:"items"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + HasNext bool `json:"hasNext,omitempty"` + HasPrev bool `json:"hasPrev,omitempty"` } // RegisterRoutes registers robot type related routes. @@ -78,8 +83,8 @@ type robotTypeRow struct { SensorSuite sql.NullString `db:"sensor_suite"` ROSTopics sql.NullString `db:"ros_topics"` Capabilities sql.NullString `db:"capabilities"` - CreatedAt sql.NullString `db:"created_at"` - UpdatedAt sql.NullString `db:"updated_at"` + CreatedAt sql.NullTime `db:"created_at"` + UpdatedAt sql.NullTime `db:"updated_at"` } func sqlNullStringFromOptionalPtr(s *string) sql.NullString { @@ -123,12 +128,12 @@ func robotTypeRowToResponse(rt robotTypeRow) RobotTypeResponse { createdAt := "" if rt.CreatedAt.Valid { - createdAt = rt.CreatedAt.String + createdAt = rt.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if rt.UpdatedAt.Valid { - updatedAt = rt.UpdatedAt.String + updatedAt = rt.UpdatedAt.Time.UTC().Format(time.RFC3339) } return RobotTypeResponse{ @@ -240,23 +245,41 @@ func (h *RobotTypeHandler) CreateRobotType(c *gin.Context) { // ListRobotTypes handles robot type listing requests. // // @Summary List robot types -// @Description Lists all robot types +// @Description Lists all robot types with pagination // @Tags robot_types // @Accept json // @Produce json +// @Param limit query int false "Max results (default 50, max 100)" +// @Param offset query int false "Pagination offset (default 0)" // @Success 200 {object} RobotTypeListResponse +// @Failure 400 {object} map[string]string // @Failure 500 {object} map[string]string // @Router /robot_types [get] func (h *RobotTypeHandler) ListRobotTypes(c *gin.Context) { + pagination, err := ParsePagination(c) + if err != nil { + PaginationErrorResponse(c, err) + return + } + + countQuery := "SELECT COUNT(*) FROM robot_types WHERE deleted_at IS NULL" + var total int + if err := h.db.Get(&total, countQuery); err != nil { + logger.Printf("[ROBOT] Failed to count robot types: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list robot types"}) + return + } + query := ` SELECT ` + robotTypeSelectColumns + ` FROM robot_types WHERE deleted_at IS NULL ORDER BY id DESC + LIMIT ? OFFSET ? ` var dbRows []robotTypeRow - if err := h.db.Select(&dbRows, query); err != nil { + if err := h.db.Select(&dbRows, query, pagination.Limit, pagination.Offset); err != nil { logger.Printf("[ROBOT] Failed to query robot types: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list robot types"}) return @@ -267,8 +290,16 @@ func (h *RobotTypeHandler) ListRobotTypes(c *gin.Context) { robotTypes = append(robotTypes, robotTypeRowToResponse(rt)) } + hasNext := (pagination.Offset + pagination.Limit) < total + hasPrev := pagination.Offset > 0 + c.JSON(http.StatusOK, RobotTypeListResponse{ - RobotTypes: robotTypes, + Items: robotTypes, + Total: total, + Limit: pagination.Limit, + Offset: pagination.Offset, + HasNext: hasNext, + HasPrev: hasPrev, }) } diff --git a/internal/api/handlers/scene.go b/internal/api/handlers/scene.go index 8556409..f9921d7 100644 --- a/internal/api/handlers/scene.go +++ b/internal/api/handlers/scene.go @@ -42,7 +42,12 @@ type SceneResponse struct { // SceneListResponse represents the response for listing scenes. type SceneListResponse struct { - Scenes []SceneResponse `json:"scenes"` + Items []SceneResponse `json:"items"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + HasNext bool `json:"hasNext,omitempty"` + HasPrev bool `json:"hasPrev,omitempty"` } // CreateSceneRequest represents the request body for creating a scene. @@ -85,8 +90,8 @@ type sceneRow struct { Description sql.NullString `db:"description"` InitialSceneLayoutTemplate sql.NullString `db:"initial_scene_layout_template"` SubsceneCount int `db:"subscene_count"` - CreatedAt sql.NullString `db:"created_at"` - UpdatedAt sql.NullString `db:"updated_at"` + CreatedAt sql.NullTime `db:"created_at"` + UpdatedAt sql.NullTime `db:"updated_at"` } // ListScenes handles scene listing requests with filtering. @@ -97,12 +102,42 @@ type sceneRow struct { // @Accept json // @Produce json // @Param factory_id query string false "Filter by factory ID" +// @Param limit query int false "Max results (default 50, max 100)" +// @Param offset query int false "Pagination offset (default 0)" // @Success 200 {object} SceneListResponse +// @Failure 400 {object} map[string]string // @Failure 500 {object} map[string]string // @Router /scenes [get] func (h *SceneHandler) ListScenes(c *gin.Context) { + pagination, err := ParsePagination(c) + if err != nil { + PaginationErrorResponse(c, err) + return + } + factoryID := c.Query("factory_id") + whereClause := "WHERE s.deleted_at IS NULL" + args := []interface{}{} + + if factoryID != "" { + parsedFactoryID, err := strconv.ParseInt(factoryID, 10, 64) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid factory_id format"}) + return + } + whereClause += " AND factory_id = ?" + args = append(args, parsedFactoryID) + } + + countQuery := "SELECT COUNT(*) FROM scenes s " + whereClause + var total int + if err := h.db.Get(&total, countQuery, args...); err != nil { + logger.Printf("[SCENE] Failed to count scenes: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list scenes"}) + return + } + query := ` SELECT s.id, @@ -114,24 +149,15 @@ func (h *SceneHandler) ListScenes(c *gin.Context) { s.updated_at, (SELECT COUNT(*) FROM subscenes sub WHERE sub.scene_id = s.id AND sub.deleted_at IS NULL) as subscene_count FROM scenes s - WHERE s.deleted_at IS NULL + ` + whereClause + ` + ORDER BY id DESC + LIMIT ? OFFSET ? ` - args := []interface{}{} - if factoryID != "" { - parsedFactoryID, err := strconv.ParseInt(factoryID, 10, 64) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "invalid factory_id format"}) - return - } - query += " AND factory_id = ?" - args = append(args, parsedFactoryID) - } - - query += " ORDER BY id DESC" + queryArgs := append(args, pagination.Limit, pagination.Offset) var dbRows []sceneRow - if err := h.db.Select(&dbRows, query, args...); err != nil { + if err := h.db.Select(&dbRows, query, queryArgs...); err != nil { logger.Printf("[SCENE] Failed to query scenes: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list scenes"}) return @@ -149,11 +175,11 @@ func (h *SceneHandler) ListScenes(c *gin.Context) { } createdAt := "" if s.CreatedAt.Valid { - createdAt = s.CreatedAt.String + createdAt = s.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if s.UpdatedAt.Valid { - updatedAt = s.UpdatedAt.String + updatedAt = s.UpdatedAt.Time.UTC().Format(time.RFC3339) } scenes = append(scenes, SceneResponse{ ID: fmt.Sprintf("%d", s.ID), @@ -167,8 +193,16 @@ func (h *SceneHandler) ListScenes(c *gin.Context) { }) } + hasNext := (pagination.Offset + pagination.Limit) < total + hasPrev := pagination.Offset > 0 + c.JSON(http.StatusOK, SceneListResponse{ - Scenes: scenes, + Items: scenes, + Total: total, + Limit: pagination.Limit, + Offset: pagination.Offset, + HasNext: hasNext, + HasPrev: hasPrev, }) } @@ -227,11 +261,11 @@ func (h *SceneHandler) GetScene(c *gin.Context) { } createdAt := "" if s.CreatedAt.Valid { - createdAt = s.CreatedAt.String + createdAt = s.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if s.UpdatedAt.Valid { - updatedAt = s.UpdatedAt.String + updatedAt = s.UpdatedAt.Time.UTC().Format(time.RFC3339) } c.JSON(http.StatusOK, SceneResponse{ ID: fmt.Sprintf("%d", s.ID), @@ -478,11 +512,11 @@ func (h *SceneHandler) UpdateScene(c *gin.Context) { } createdAt := "" if s.CreatedAt.Valid { - createdAt = s.CreatedAt.String + createdAt = s.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if s.UpdatedAt.Valid { - updatedAt = s.UpdatedAt.String + updatedAt = s.UpdatedAt.Time.UTC().Format(time.RFC3339) } c.JSON(http.StatusOK, SceneResponse{ ID: fmt.Sprintf("%d", s.ID), diff --git a/internal/api/handlers/skill.go b/internal/api/handlers/skill.go index 83fa430..e3675f3 100644 --- a/internal/api/handlers/skill.go +++ b/internal/api/handlers/skill.go @@ -60,7 +60,12 @@ type SkillResponse struct { // SkillListResponse represents the response for listing skills. type SkillListResponse struct { - Skills []SkillResponse `json:"skills"` + Items []SkillResponse `json:"items"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + HasNext bool `json:"hasNext,omitempty"` + HasPrev bool `json:"hasPrev,omitempty"` } // CreateSkillRequest represents the request body for creating a skill. @@ -104,21 +109,38 @@ type skillRow struct { Description sql.NullString `db:"description"` Version sql.NullString `db:"version"` Metadata sql.NullString `db:"metadata"` - CreatedAt sql.NullString `db:"created_at"` - UpdatedAt sql.NullString `db:"updated_at"` + CreatedAt sql.NullTime `db:"created_at"` + UpdatedAt sql.NullTime `db:"updated_at"` } // ListSkills handles skill listing requests. // // @Summary List skills -// @Description Lists all skills +// @Description Lists all skills with pagination // @Tags skills // @Accept json // @Produce json +// @Param limit query int false "Max results (default 50, max 100)" +// @Param offset query int false "Pagination offset (default 0)" // @Success 200 {object} SkillListResponse +// @Failure 400 {object} map[string]string // @Failure 500 {object} map[string]string // @Router /skills [get] func (h *SkillHandler) ListSkills(c *gin.Context) { + pagination, err := ParsePagination(c) + if err != nil { + PaginationErrorResponse(c, err) + return + } + + countQuery := "SELECT COUNT(*) FROM skills WHERE deleted_at IS NULL" + var total int + if err := h.db.Get(&total, countQuery); err != nil { + logger.Printf("[SKILL] Failed to count skills: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list skills"}) + return + } + query := ` SELECT id, @@ -131,10 +153,11 @@ func (h *SkillHandler) ListSkills(c *gin.Context) { FROM skills WHERE deleted_at IS NULL ORDER BY id DESC + LIMIT ? OFFSET ? ` var dbRows []skillRow - if err := h.db.Select(&dbRows, query); err != nil { + if err := h.db.Select(&dbRows, query, pagination.Limit, pagination.Offset); err != nil { logger.Printf("[SKILL] Failed to query skills: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list skills"}) return @@ -156,11 +179,11 @@ func (h *SkillHandler) ListSkills(c *gin.Context) { } createdAt := "" if s.CreatedAt.Valid { - createdAt = s.CreatedAt.String + createdAt = s.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if s.UpdatedAt.Valid { - updatedAt = s.UpdatedAt.String + updatedAt = s.UpdatedAt.Time.UTC().Format(time.RFC3339) } skills = append(skills, SkillResponse{ @@ -174,8 +197,16 @@ func (h *SkillHandler) ListSkills(c *gin.Context) { }) } + hasNext := (pagination.Offset + pagination.Limit) < total + hasPrev := pagination.Offset > 0 + c.JSON(http.StatusOK, SkillListResponse{ - Skills: skills, + Items: skills, + Total: total, + Limit: pagination.Limit, + Offset: pagination.Offset, + HasNext: hasNext, + HasPrev: hasPrev, }) } @@ -238,11 +269,11 @@ func (h *SkillHandler) GetSkill(c *gin.Context) { } createdAt := "" if s.CreatedAt.Valid { - createdAt = s.CreatedAt.String + createdAt = s.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if s.UpdatedAt.Valid { - updatedAt = s.UpdatedAt.String + updatedAt = s.UpdatedAt.Time.UTC().Format(time.RFC3339) } c.JSON(http.StatusOK, SkillResponse{ @@ -397,38 +428,24 @@ func (h *SkillHandler) UpdateSkill(c *gin.Context) { return } - // Build update query dynamically + // Build update query dynamically (id-scoped updates only; slug rename is handled separately). updates := []string{} args := []interface{}{} + renameSlug := false + oldSlug := current.Slug + newSlug := "" if req.Slug != nil { - slug := strings.TrimSpace(*req.Slug) - if slug == "" { + newSlug = strings.TrimSpace(*req.Slug) + if newSlug == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "slug cannot be empty"}) return } - if !isValidSlug(slug) { + if !isValidSlug(newSlug) { c.JSON(http.StatusBadRequest, gin.H{"error": invalidSlugUserMessage}) return } - if slug != current.Slug { - // New slug: reset version to 1.0.0. - targetVersion := "1.0.0" - - var exists bool - err := h.db.Get(&exists, "SELECT EXISTS(SELECT 1 FROM skills WHERE slug = ? AND id != ? AND version = ? AND deleted_at IS NULL)", slug, id, targetVersion) - if err == nil && exists { - c.JSON(http.StatusBadRequest, gin.H{"error": "slug already exists for this version"}) - return - } - - updates = append(updates, "slug = ?", "version = ?") - args = append(args, slug, targetVersion) - } else { - // Same slug in payload: no version reset. - updates = append(updates, "slug = ?") - args = append(args, slug) - } + renameSlug = newSlug != oldSlug } if req.Description != nil { @@ -467,28 +484,78 @@ func (h *SkillHandler) UpdateSkill(c *gin.Context) { } } - if len(updates) == 0 { + // It's valid to rename a slug even if no id-scoped fields are being updated. + if !renameSlug && len(updates) == 0 { c.JSON(http.StatusBadRequest, gin.H{"error": "no fields to update"}) return } now := time.Now().UTC() - updates = append(updates, "updated_at = ?") - args = append(args, now) - args = append(args, id) - - query := fmt.Sprintf("UPDATE skills SET %s WHERE id = ? AND deleted_at IS NULL", strings.Join(updates, ", ")) - - result, err := h.db.Exec(query, args...) + tx, err := h.db.Beginx() if err != nil { - logger.Printf("[SKILL] Failed to update skill: %v", err) + logger.Printf("[SKILL] Failed to begin transaction: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update skill"}) return } + defer func() { _ = tx.Rollback() }() - rowsAffected, _ := result.RowsAffected() - if rowsAffected == 0 { - c.JSON(http.StatusNotFound, gin.H{"error": "skill not found"}) + if renameSlug { + // Do not allow rename if target slug already exists (any version). + var exists bool + if err := tx.Get(&exists, "SELECT EXISTS(SELECT 1 FROM skills WHERE slug = ? AND deleted_at IS NULL)", newSlug); err != nil { + logger.Printf("[SKILL] Failed to check target slug existence: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update skill"}) + return + } + if exists { + c.JSON(http.StatusBadRequest, gin.H{"error": "target slug already exists"}) + return + } + + // Rename all versions under the same old slug. + if _, err := tx.Exec( + "UPDATE skills SET slug = ?, updated_at = ? WHERE slug = ? AND deleted_at IS NULL", + newSlug, now, oldSlug, + ); err != nil { + logger.Printf("[SKILL] Failed to rename skill slug: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update skill"}) + return + } + } + + if len(updates) > 0 { + updates = append(updates, "updated_at = ?") + args = append(args, now) + args = append(args, id) + + query := fmt.Sprintf("UPDATE skills SET %s WHERE id = ? AND deleted_at IS NULL", strings.Join(updates, ", ")) + result, err := tx.Exec(query, args...) + if err != nil { + logger.Printf("[SKILL] Failed to update skill: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update skill"}) + return + } + + rowsAffected, _ := result.RowsAffected() + if rowsAffected == 0 { + // MySQL may report 0 rows affected when the row matches but values are unchanged. + // Re-check existence to avoid false 404 responses on no-op updates. + var stillExists bool + if err := tx.Get(&stillExists, "SELECT EXISTS(SELECT 1 FROM skills WHERE id = ? AND deleted_at IS NULL)", id); err != nil { + logger.Printf("[SKILL] Failed to re-check skill existence: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update skill"}) + return + } + if !stillExists { + c.JSON(http.StatusNotFound, gin.H{"error": "skill not found"}) + return + } + } + } + + if err := tx.Commit(); err != nil { + logger.Printf("[SKILL] Failed to commit transaction: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update skill"}) return } @@ -515,11 +582,11 @@ func (h *SkillHandler) UpdateSkill(c *gin.Context) { } createdAt := "" if s.CreatedAt.Valid { - createdAt = s.CreatedAt.String + createdAt = s.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if s.UpdatedAt.Valid { - updatedAt = s.UpdatedAt.String + updatedAt = s.UpdatedAt.Time.UTC().Format(time.RFC3339) } c.JSON(http.StatusOK, SkillResponse{ diff --git a/internal/api/handlers/sop.go b/internal/api/handlers/sop.go index 24b935f..1698109 100644 --- a/internal/api/handlers/sop.go +++ b/internal/api/handlers/sop.go @@ -42,7 +42,12 @@ type SOPResponse struct { // SOPListResponse represents the response for listing SOPs. type SOPListResponse struct { - SOPs []SOPResponse `json:"sops"` + Items []SOPResponse `json:"items"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + HasNext bool `json:"hasNext,omitempty"` + HasPrev bool `json:"hasPrev,omitempty"` } // CreateSOPRequest represents the request body for creating an SOP. @@ -86,21 +91,38 @@ type sopRow struct { Description sql.NullString `db:"description"` SkillSequence string `db:"skill_sequence"` Version sql.NullString `db:"version"` - CreatedAt sql.NullString `db:"created_at"` - UpdatedAt sql.NullString `db:"updated_at"` + CreatedAt sql.NullTime `db:"created_at"` + UpdatedAt sql.NullTime `db:"updated_at"` } // ListSOPs handles SOP listing requests. // // @Summary List SOPs -// @Description Lists all SOPs +// @Description Lists all SOPs with pagination // @Tags sops // @Accept json // @Produce json +// @Param limit query int false "Max results (default 50, max 100)" +// @Param offset query int false "Pagination offset (default 0)" // @Success 200 {object} SOPListResponse +// @Failure 400 {object} map[string]string // @Failure 500 {object} map[string]string // @Router /sops [get] func (h *SOPHandler) ListSOPs(c *gin.Context) { + pagination, err := ParsePagination(c) + if err != nil { + PaginationErrorResponse(c, err) + return + } + + countQuery := "SELECT COUNT(*) FROM sops WHERE deleted_at IS NULL" + var total int + if err := h.db.Get(&total, countQuery); err != nil { + logger.Printf("[SOP] Failed to count SOPs: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list SOPs"}) + return + } + query := ` SELECT id, @@ -113,10 +135,11 @@ func (h *SOPHandler) ListSOPs(c *gin.Context) { FROM sops WHERE deleted_at IS NULL ORDER BY id DESC + LIMIT ? OFFSET ? ` var dbRows []sopRow - if err := h.db.Select(&dbRows, query); err != nil { + if err := h.db.Select(&dbRows, query, pagination.Limit, pagination.Offset); err != nil { logger.Printf("[SOP] Failed to query SOPs: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list SOPs"}) return @@ -135,11 +158,11 @@ func (h *SOPHandler) ListSOPs(c *gin.Context) { } createdAt := "" if s.CreatedAt.Valid { - createdAt = s.CreatedAt.String + createdAt = s.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if s.UpdatedAt.Valid { - updatedAt = s.UpdatedAt.String + updatedAt = s.UpdatedAt.Time.UTC().Format(time.RFC3339) } sops = append(sops, SOPResponse{ @@ -153,8 +176,16 @@ func (h *SOPHandler) ListSOPs(c *gin.Context) { }) } + hasNext := (pagination.Offset + pagination.Limit) < total + hasPrev := pagination.Offset > 0 + c.JSON(http.StatusOK, SOPListResponse{ - SOPs: sops, + Items: sops, + Total: total, + Limit: pagination.Limit, + Offset: pagination.Offset, + HasNext: hasNext, + HasPrev: hasPrev, }) } @@ -214,11 +245,11 @@ func (h *SOPHandler) GetSOP(c *gin.Context) { } createdAt := "" if s.CreatedAt.Valid { - createdAt = s.CreatedAt.String + createdAt = s.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if s.UpdatedAt.Valid { - updatedAt = s.UpdatedAt.String + updatedAt = s.UpdatedAt.Time.UTC().Format(time.RFC3339) } c.JSON(http.StatusOK, SOPResponse{ @@ -378,38 +409,24 @@ func (h *SOPHandler) UpdateSOP(c *gin.Context) { return } - // Build update query dynamically + // Build update query dynamically (id-scoped updates only; slug rename is handled separately). updates := []string{} args := []interface{}{} + renameSlug := false + oldSlug := current.Slug + newSlug := "" if req.Slug != nil { - slug := strings.TrimSpace(*req.Slug) - if slug == "" { + newSlug = strings.TrimSpace(*req.Slug) + if newSlug == "" { c.JSON(http.StatusBadRequest, gin.H{"error": "slug cannot be empty"}) return } - if !isValidSlug(slug) { + if !isValidSlug(newSlug) { c.JSON(http.StatusBadRequest, gin.H{"error": invalidSlugUserMessage}) return } - if slug != current.Slug { - // New slug: reset version to 1.0.0. - targetVersion := "1.0.0" - - var exists bool - err := h.db.Get(&exists, "SELECT EXISTS(SELECT 1 FROM sops WHERE slug = ? AND id != ? AND version = ? AND deleted_at IS NULL)", slug, id, targetVersion) - if err == nil && exists { - c.JSON(http.StatusBadRequest, gin.H{"error": "slug already exists for this version"}) - return - } - - updates = append(updates, "slug = ?", "version = ?") - args = append(args, slug, targetVersion) - } else { - // Same slug in payload: no version reset. - updates = append(updates, "slug = ?") - args = append(args, slug) - } + renameSlug = newSlug != oldSlug } if req.Description != nil { @@ -445,28 +462,78 @@ func (h *SOPHandler) UpdateSOP(c *gin.Context) { } } - if len(updates) == 0 { + // It's valid to rename a slug even if no id-scoped fields are being updated. + if !renameSlug && len(updates) == 0 { c.JSON(http.StatusBadRequest, gin.H{"error": "no fields to update"}) return } now := time.Now().UTC() - updates = append(updates, "updated_at = ?") - args = append(args, now) - args = append(args, id) - - query := fmt.Sprintf("UPDATE sops SET %s WHERE id = ? AND deleted_at IS NULL", strings.Join(updates, ", ")) - - result, err := h.db.Exec(query, args...) + tx, err := h.db.Beginx() if err != nil { - logger.Printf("[SOP] Failed to update SOP: %v", err) + logger.Printf("[SOP] Failed to begin transaction: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update SOP"}) return } + defer func() { _ = tx.Rollback() }() - rowsAffected, _ := result.RowsAffected() - if rowsAffected == 0 { - c.JSON(http.StatusNotFound, gin.H{"error": "SOP not found"}) + if renameSlug { + // Do not allow rename if target slug already exists (any version). + var exists bool + if err := tx.Get(&exists, "SELECT EXISTS(SELECT 1 FROM sops WHERE slug = ? AND deleted_at IS NULL)", newSlug); err != nil { + logger.Printf("[SOP] Failed to check target slug existence: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update SOP"}) + return + } + if exists { + c.JSON(http.StatusBadRequest, gin.H{"error": "target slug already exists"}) + return + } + + // Rename all versions under the same old slug. + if _, err := tx.Exec( + "UPDATE sops SET slug = ?, updated_at = ? WHERE slug = ? AND deleted_at IS NULL", + newSlug, now, oldSlug, + ); err != nil { + logger.Printf("[SOP] Failed to rename SOP slug: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update SOP"}) + return + } + } + + if len(updates) > 0 { + updates = append(updates, "updated_at = ?") + args = append(args, now) + args = append(args, id) + + query := fmt.Sprintf("UPDATE sops SET %s WHERE id = ? AND deleted_at IS NULL", strings.Join(updates, ", ")) + result, err := tx.Exec(query, args...) + if err != nil { + logger.Printf("[SOP] Failed to update SOP: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update SOP"}) + return + } + + rowsAffected, _ := result.RowsAffected() + if rowsAffected == 0 { + // MySQL may report 0 rows affected when the row matches but values are unchanged. + // Re-check existence to avoid false 404 responses on no-op updates. + var stillExists bool + if err := tx.Get(&stillExists, "SELECT EXISTS(SELECT 1 FROM sops WHERE id = ? AND deleted_at IS NULL)", id); err != nil { + logger.Printf("[SOP] Failed to re-check SOP existence: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update SOP"}) + return + } + if !stillExists { + c.JSON(http.StatusNotFound, gin.H{"error": "SOP not found"}) + return + } + } + } + + if err := tx.Commit(); err != nil { + logger.Printf("[SOP] Failed to commit transaction: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to update SOP"}) return } @@ -490,11 +557,11 @@ func (h *SOPHandler) UpdateSOP(c *gin.Context) { } createdAt := "" if s.CreatedAt.Valid { - createdAt = s.CreatedAt.String + createdAt = s.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if s.UpdatedAt.Valid { - updatedAt = s.UpdatedAt.String + updatedAt = s.UpdatedAt.Time.UTC().Format(time.RFC3339) } c.JSON(http.StatusOK, SOPResponse{ diff --git a/internal/api/handlers/station.go b/internal/api/handlers/station.go index a1d3e5c..198cba2 100644 --- a/internal/api/handlers/station.go +++ b/internal/api/handlers/station.go @@ -248,8 +248,7 @@ func (h *StationHandler) CreateStation(c *gin.Context) { return } - // Generate created_at timestamp - createdAt := time.Now().UTC().Format("2006-01-02 15:04:05") + now := time.Now().UTC() metadataStr := sql.NullString{String: "{}", Valid: true} if req.Metadata != nil { @@ -288,8 +287,8 @@ func (h *StationHandler) CreateStation(c *gin.Context) { req.Name, "offline", metadataStr, - createdAt, - createdAt, + now, + now, ) if err != nil { logger.Printf("[STATION] Failed to insert workstation: %v", err) @@ -304,9 +303,6 @@ func (h *StationHandler) CreateStation(c *gin.Context) { return } - // Format created_at for response in ISO 8601 - createdAtISO, _ := time.Parse("2006-01-02 15:04:05", createdAt) - var metaOut interface{} if metadataStr.Valid { metaOut = stationMetadataFromDB(metadataStr) @@ -320,8 +316,8 @@ func (h *StationHandler) CreateStation(c *gin.Context) { Status: "offline", Name: req.Name, Metadata: metaOut, - CreatedAt: createdAtISO.Format(time.RFC3339), - UpdatedAt: createdAtISO.Format(time.RFC3339), + CreatedAt: now.Format(time.RFC3339), + UpdatedAt: now.Format(time.RFC3339), }) } @@ -338,22 +334,39 @@ type stationListRow struct { Name sql.NullString `db:"name"` Status string `db:"status"` Metadata sql.NullString `db:"metadata"` - CreatedAt sql.NullString `db:"created_at"` - UpdatedAt sql.NullString `db:"updated_at"` + CreatedAt sql.NullTime `db:"created_at"` + UpdatedAt sql.NullTime `db:"updated_at"` } // ListStations handles listing all stations. // // @Summary List stations -// @Description Returns a list of all workstations +// @Description Returns a list of all workstations with pagination // @Tags stations // @Produce json -// @Success 200 {object} map[string][]StationResponse -// @Failure 500 {object} map[string]string +// @Param limit query int false "Max results (default 50, max 100)" +// @Param offset query int false "Pagination offset (default 0)" +// @Success 200 {object} ListResponse +// @Failure 400 {object} map[string]string +// @Failure 500 {object} map[string]string // @Router /stations [get] func (h *StationHandler) ListStations(c *gin.Context) { + pagination, err := ParsePagination(c) + if err != nil { + PaginationErrorResponse(c, err) + return + } + + countQuery := "SELECT COUNT(*) FROM workstations WHERE deleted_at IS NULL" + var total int + if err := h.db.Get(&total, countQuery); err != nil { + logger.Printf("[STATION] Failed to count stations: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list stations"}) + return + } + var stations []stationListRow - err := h.db.Select(&stations, ` + err = h.db.Select(&stations, ` SELECT id, robot_id, robot_name, robot_serial, data_collector_id, collector_name, collector_operator_id, @@ -361,7 +374,8 @@ func (h *StationHandler) ListStations(c *gin.Context) { FROM workstations WHERE deleted_at IS NULL ORDER BY id DESC - `) + LIMIT ? OFFSET ? + `, pagination.Limit, pagination.Offset) if err != nil && err != sql.ErrNoRows { logger.Printf("[STATION] Failed to query stations: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list stations"}) @@ -372,16 +386,15 @@ func (h *StationHandler) ListStations(c *gin.Context) { stations = []stationListRow{} } - // Build response response := make([]StationResponse, 0, len(stations)) for _, s := range stations { var createdAtStr string if s.CreatedAt.Valid { - createdAtStr = formatDBTimeToRFC3339(s.CreatedAt.String) + createdAtStr = s.CreatedAt.Time.UTC().Format(time.RFC3339) } var updatedAtStr string if s.UpdatedAt.Valid { - updatedAtStr = formatDBTimeToRFC3339(s.UpdatedAt.String) + updatedAtStr = s.UpdatedAt.Time.UTC().Format(time.RFC3339) } response = append(response, StationResponse{ @@ -397,7 +410,17 @@ func (h *StationHandler) ListStations(c *gin.Context) { }) } - c.JSON(http.StatusOK, gin.H{"stations": response}) + hasNext := (pagination.Offset + pagination.Limit) < total + hasPrev := pagination.Offset > 0 + + c.JSON(http.StatusOK, ListResponse{ + Items: response, + Total: total, + Limit: pagination.Limit, + Offset: pagination.Offset, + HasNext: hasNext, + HasPrev: hasPrev, + }) } const maxStationLookupIDs = 500 @@ -834,11 +857,11 @@ func (h *StationHandler) UpdateStation(c *gin.Context) { // Format response var createdAtStr string if station.CreatedAt.Valid { - createdAtStr = formatDBTimeToRFC3339(station.CreatedAt.String) + createdAtStr = station.CreatedAt.Time.UTC().Format(time.RFC3339) } var updatedAtStr string if station.UpdatedAt.Valid { - updatedAtStr = formatDBTimeToRFC3339(station.UpdatedAt.String) + updatedAtStr = station.UpdatedAt.Time.UTC().Format(time.RFC3339) } c.JSON(http.StatusOK, StationResponse{ @@ -897,11 +920,11 @@ func (h *StationHandler) GetStation(c *gin.Context) { var createdAtStr string if station.CreatedAt.Valid { - createdAtStr = formatDBTimeToRFC3339(station.CreatedAt.String) + createdAtStr = station.CreatedAt.Time.UTC().Format(time.RFC3339) } var updatedAtStr string if station.UpdatedAt.Valid { - updatedAtStr = formatDBTimeToRFC3339(station.UpdatedAt.String) + updatedAtStr = station.UpdatedAt.Time.UTC().Format(time.RFC3339) } c.JSON(http.StatusOK, StationResponse{ diff --git a/internal/api/handlers/subscene.go b/internal/api/handlers/subscene.go index 0bfc5e2..41358bc 100644 --- a/internal/api/handlers/subscene.go +++ b/internal/api/handlers/subscene.go @@ -41,7 +41,12 @@ type SubsceneResponse struct { // SubsceneListResponse represents the response for listing subscenes. type SubsceneListResponse struct { - Subscenes []SubsceneResponse `json:"subscenes"` + Items []SubsceneResponse `json:"items"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + HasNext bool `json:"hasNext,omitempty"` + HasPrev bool `json:"hasPrev,omitempty"` } // CreateSubsceneRequest represents the request body for creating a subscene. @@ -83,8 +88,8 @@ type subsceneRow struct { Name string `db:"name"` Description sql.NullString `db:"description"` InitialSceneLayout sql.NullString `db:"initial_scene_layout"` - CreatedAt sql.NullString `db:"created_at"` - UpdatedAt sql.NullString `db:"updated_at"` + CreatedAt sql.NullTime `db:"created_at"` + UpdatedAt sql.NullTime `db:"updated_at"` } func (h *SubsceneHandler) getSceneInitialLayoutTemplate(sceneID int64) (sql.NullString, error) { @@ -105,12 +110,42 @@ func (h *SubsceneHandler) getSceneInitialLayoutTemplate(sceneID int64) (sql.Null // @Accept json // @Produce json // @Param scene_id query string false "Filter by scene ID" +// @Param limit query int false "Max results (default 50, max 100)" +// @Param offset query int false "Pagination offset (default 0)" // @Success 200 {object} SubsceneListResponse +// @Failure 400 {object} map[string]string // @Failure 500 {object} map[string]string // @Router /subscenes [get] func (h *SubsceneHandler) ListSubscenes(c *gin.Context) { + pagination, err := ParsePagination(c) + if err != nil { + PaginationErrorResponse(c, err) + return + } + sceneID := c.Query("scene_id") + whereClause := "WHERE deleted_at IS NULL" + args := []interface{}{} + + if sceneID != "" { + parsedSceneID, err := strconv.ParseInt(sceneID, 10, 64) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "invalid scene_id format"}) + return + } + whereClause += " AND scene_id = ?" + args = append(args, parsedSceneID) + } + + countQuery := "SELECT COUNT(*) FROM subscenes " + whereClause + var total int + if err := h.db.Get(&total, countQuery, args...); err != nil { + logger.Printf("[SUBSCENE] Failed to count subscenes: %v", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list subscenes"}) + return + } + query := ` SELECT id, @@ -121,24 +156,15 @@ func (h *SubsceneHandler) ListSubscenes(c *gin.Context) { created_at, updated_at FROM subscenes - WHERE deleted_at IS NULL + ` + whereClause + ` + ORDER BY id DESC + LIMIT ? OFFSET ? ` - args := []interface{}{} - if sceneID != "" { - parsedSceneID, err := strconv.ParseInt(sceneID, 10, 64) - if err != nil { - c.JSON(http.StatusBadRequest, gin.H{"error": "invalid scene_id format"}) - return - } - query += " AND scene_id = ?" - args = append(args, parsedSceneID) - } - - query += " ORDER BY id DESC" + queryArgs := append(args, pagination.Limit, pagination.Offset) var dbRows []subsceneRow - if err := h.db.Select(&dbRows, query, args...); err != nil { + if err := h.db.Select(&dbRows, query, queryArgs...); err != nil { logger.Printf("[SUBSCENE] Failed to query subscenes: %v", err) c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to list subscenes"}) return @@ -156,11 +182,11 @@ func (h *SubsceneHandler) ListSubscenes(c *gin.Context) { } createdAt := "" if s.CreatedAt.Valid { - createdAt = s.CreatedAt.String + createdAt = s.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if s.UpdatedAt.Valid { - updatedAt = s.UpdatedAt.String + updatedAt = s.UpdatedAt.Time.UTC().Format(time.RFC3339) } subscenes = append(subscenes, SubsceneResponse{ ID: fmt.Sprintf("%d", s.ID), @@ -173,8 +199,16 @@ func (h *SubsceneHandler) ListSubscenes(c *gin.Context) { }) } + hasNext := (pagination.Offset + pagination.Limit) < total + hasPrev := pagination.Offset > 0 + c.JSON(http.StatusOK, SubsceneListResponse{ - Subscenes: subscenes, + Items: subscenes, + Total: total, + Limit: pagination.Limit, + Offset: pagination.Offset, + HasNext: hasNext, + HasPrev: hasPrev, }) } @@ -233,11 +267,11 @@ func (h *SubsceneHandler) GetSubscene(c *gin.Context) { } createdAt := "" if s.CreatedAt.Valid { - createdAt = s.CreatedAt.String + createdAt = s.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if s.UpdatedAt.Valid { - updatedAt = s.UpdatedAt.String + updatedAt = s.UpdatedAt.Time.UTC().Format(time.RFC3339) } c.JSON(http.StatusOK, SubsceneResponse{ ID: fmt.Sprintf("%d", s.ID), @@ -520,11 +554,11 @@ func (h *SubsceneHandler) UpdateSubscene(c *gin.Context) { } createdAt := "" if s.CreatedAt.Valid { - createdAt = s.CreatedAt.String + createdAt = s.CreatedAt.Time.UTC().Format(time.RFC3339) } updatedAt := "" if s.UpdatedAt.Valid { - updatedAt = s.UpdatedAt.String + updatedAt = s.UpdatedAt.Time.UTC().Format(time.RFC3339) } c.JSON(http.StatusOK, SubsceneResponse{ ID: fmt.Sprintf("%d", s.ID), diff --git a/internal/api/handlers/task.go b/internal/api/handlers/task.go index ce8e50f..916d4cd 100644 --- a/internal/api/handlers/task.go +++ b/internal/api/handlers/task.go @@ -114,26 +114,30 @@ var validTaskStatuses = map[string]struct{}{ // TaskListItem represents a task item in list responses. type TaskListItem struct { - ID string `json:"id" db:"id"` - TaskID string `json:"task_id" db:"task_id"` - BatchID string `json:"batch_id" db:"batch_id"` - OrderID string `json:"order_id" db:"order_id"` - SOPID string `json:"sop_id" db:"sop_id"` - WorkstationID *string `json:"workstation_id" db:"workstation_id"` - SceneID string `json:"scene_id" db:"scene_id"` - SceneName string `json:"scene_name" db:"scene_name"` - SubsceneID string `json:"subscene_id" db:"subscene_id"` - SubsceneName string `json:"subscene_name" db:"subscene_name"` - Status string `json:"status" db:"status"` - AssignedAt *string `json:"assigned_at" db:"assigned_at"` + ID int64 `json:"id" db:"id"` + TaskID string `json:"task_id" db:"task_id"` + BatchID string `json:"batch_id" db:"batch_id"` + OrderID string `json:"order_id" db:"order_id"` + SOPID string `json:"sop_id" db:"sop_id"` + WorkstationID *string `json:"workstation_id" db:"workstation_id"` + RobotDeviceID *string `json:"robot_device_id" db:"robot_device_id"` + CollectorOperatorID *string `json:"collector_operator_id" db:"collector_operator_id"` + SceneID string `json:"scene_id" db:"scene_id"` + SceneName string `json:"scene_name" db:"scene_name"` + SubsceneID string `json:"subscene_id" db:"subscene_id"` + SubsceneName string `json:"subscene_name" db:"subscene_name"` + Status string `json:"status" db:"status"` + AssignedAt *string `json:"assigned_at" db:"assigned_at"` } // ListTasksResponse represents the response body for listing tasks. type ListTasksResponse struct { - Tasks []TaskListItem `json:"tasks"` - Total int `json:"total"` - Limit int `json:"limit"` - Offset int `json:"offset"` + Items []TaskListItem `json:"items"` + Total int `json:"total"` + Limit int `json:"limit"` + Offset int `json:"offset"` + HasNext bool `json:"hasNext,omitempty"` + HasPrev bool `json:"hasPrev,omitempty"` } // TaskEpisodeDetail represents the episode information attached to a task. @@ -143,7 +147,7 @@ type TaskEpisodeDetail struct { // TaskDetailResponse represents the response body for getting a task by ID. type TaskDetailResponse struct { - ID string `json:"id" db:"id"` + ID int64 `json:"id" db:"id"` TaskID string `json:"task_id" db:"task_id"` BatchID string `json:"batch_id" db:"batch_id"` BatchName string `json:"batch_name" db:"batch_name"` @@ -199,12 +203,12 @@ var validTaskStatusTransitions = map[string]map[string]struct{}{ // ListTasks handles task listing requests with optional filtering. // // @Summary List tasks -// @Description Lists tasks with optional workstation, status, and public task_id filters +// @Description Lists tasks with optional workstation and status filters // @Tags tasks // @Produce json +// @Param task_id query string false "Filter by public task_id" // @Param workstation_id query string false "Filter by workstation" // @Param status query string false "Filter by status" -// @Param task_id query string false "Filter by public task_id (exact match)" // @Param limit query int false "Max results" default(50) // @Param offset query int false "Pagination offset" default(0) // @Success 200 {object} ListTasksResponse @@ -214,9 +218,9 @@ var validTaskStatusTransitions = map[string]map[string]struct{}{ func (h *TaskHandler) ListTasks(c *gin.Context) { const defaultLimit = 50 + taskID := strings.TrimSpace(c.Query("task_id")) workstationID := strings.TrimSpace(c.Query("workstation_id")) status := strings.TrimSpace(c.Query("status")) - publicTaskID := strings.TrimSpace(c.Query("task_id")) limit := defaultLimit if rawLimit := strings.TrimSpace(c.Query("limit")); rawLimit != "" { @@ -245,24 +249,24 @@ func (h *TaskHandler) ListTasks(c *gin.Context) { } } - conditions := []string{"deleted_at IS NULL"} + conditions := []string{"tasks.deleted_at IS NULL"} args := make([]interface{}, 0, 6) + if taskID != "" { + conditions = append(conditions, "tasks.task_id = ?") + args = append(args, taskID) + } + if workstationID != "" { - conditions = append(conditions, "CAST(workstation_id AS CHAR) = ?") + conditions = append(conditions, "CAST(tasks.workstation_id AS CHAR) = ?") args = append(args, workstationID) } if status != "" { - conditions = append(conditions, "status = ?") + conditions = append(conditions, "tasks.status = ?") args = append(args, status) } - if publicTaskID != "" { - conditions = append(conditions, "task_id = ?") - args = append(args, publicTaskID) - } - whereClause := strings.Join(conditions, " AND ") var total int @@ -275,21 +279,24 @@ func (h *TaskHandler) ListTasks(c *gin.Context) { queryArgs := append(append([]interface{}{}, args...), limit, offset) listQuery := fmt.Sprintf(`SELECT - CAST(id AS CHAR) AS id, - task_id AS task_id, - CAST(batch_id AS CHAR) AS batch_id, - CAST(order_id AS CHAR) AS order_id, - CAST(sop_id AS CHAR) AS sop_id, - CASE WHEN workstation_id IS NULL THEN NULL ELSE CAST(workstation_id AS CHAR) END AS workstation_id, - CAST(scene_id AS CHAR) AS scene_id, - COALESCE(scene_name, '') AS scene_name, - CAST(subscene_id AS CHAR) AS subscene_id, - COALESCE(subscene_name, '') AS subscene_name, - status, - CASE WHEN assigned_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(assigned_at, @@session.time_zone, '+00:00'), '%%Y-%%m-%%dT%%H:%%i:%%sZ') END AS assigned_at + tasks.id AS id, + tasks.task_id AS task_id, + CAST(tasks.batch_id AS CHAR) AS batch_id, + CAST(tasks.order_id AS CHAR) AS order_id, + CAST(tasks.sop_id AS CHAR) AS sop_id, + CASE WHEN tasks.workstation_id IS NULL THEN NULL ELSE CAST(tasks.workstation_id AS CHAR) END AS workstation_id, + NULLIF(TRIM(COALESCE(ws.robot_serial, '')), '') AS robot_device_id, + NULLIF(TRIM(COALESCE(ws.collector_operator_id, '')), '') AS collector_operator_id, + CAST(tasks.scene_id AS CHAR) AS scene_id, + COALESCE(tasks.scene_name, '') AS scene_name, + CAST(tasks.subscene_id AS CHAR) AS subscene_id, + COALESCE(tasks.subscene_name, '') AS subscene_name, + tasks.status, + CASE WHEN tasks.assigned_at IS NULL THEN NULL ELSE DATE_FORMAT(CONVERT_TZ(tasks.assigned_at, @@session.time_zone, '+00:00'), '%%Y-%%m-%%dT%%H:%%i:%%sZ') END AS assigned_at FROM tasks + LEFT JOIN workstations ws ON ws.id = tasks.workstation_id AND ws.deleted_at IS NULL WHERE %s - ORDER BY created_at DESC, id DESC + ORDER BY tasks.created_at DESC, tasks.id DESC LIMIT ? OFFSET ?`, whereClause) items := make([]TaskListItem, 0) @@ -299,11 +306,16 @@ func (h *TaskHandler) ListTasks(c *gin.Context) { return } + hasNext := (offset + limit) < total + hasPrev := offset > 0 + c.JSON(http.StatusOK, ListTasksResponse{ - Tasks: items, - Total: total, - Limit: limit, - Offset: offset, + Items: items, + Total: total, + Limit: limit, + Offset: offset, + HasNext: hasNext, + HasPrev: hasPrev, }) } @@ -328,7 +340,7 @@ func (h *TaskHandler) GetTask(c *gin.Context) { var task TaskDetailResponse query := `SELECT - CAST(t.id AS CHAR) AS id, + t.id AS id, t.task_id AS task_id, CAST(t.batch_id AS CHAR) AS batch_id, COALESCE(t.batch_name, '') AS batch_name, diff --git a/internal/api/handlers/transfer.go b/internal/api/handlers/transfer.go index f151d7d..9432af7 100644 --- a/internal/api/handlers/transfer.go +++ b/internal/api/handlers/transfer.go @@ -111,11 +111,24 @@ func (h *TransferHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request logger.Printf("[TRANSFER] Device %s: WebSocket accept error: %v", deviceID, err) return } + + remoteIP := extractIP(r.RemoteAddr) + dc := h.hub.NewTransferConn(conn, deviceID, remoteIP) + if !h.hub.Connect(deviceID, dc) { + logger.Printf("[TRANSFER] Device %s: connection rejected (already connected)", deviceID) + if err := conn.Close(websocket.StatusPolicyViolation, "device already connected"); err != nil { + logger.Printf("[TRANSFER] WebSocket close error for device %s: %v", deviceID, err) + } + return + } + defer func() { if err := conn.Close(websocket.StatusNormalClosure, ""); err != nil { logger.Printf("[TRANSFER] WebSocket close error for device %s: %v", deviceID, err) } }() + defer h.hub.Disconnect(deviceID, dc) + defer revertRunnableTasksOnDeviceDisconnect(h.db, deviceID, h.recorderHub, h.recorderRPCTimeout, true) // Create context for this connection ctx := r.Context() @@ -137,13 +150,6 @@ func (h *TransferHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request } }() - remoteIP := extractIP(r.RemoteAddr) - - dc := h.hub.NewTransferConn(conn, deviceID, remoteIP) - h.hub.Connect(deviceID, dc) - defer h.hub.Disconnect(deviceID) - defer revertRunnableTasksOnDeviceDisconnect(h.db, deviceID, h.recorderHub, h.recorderRPCTimeout, true) - // #nosec G706 -- Set aside for now logger.Printf("[TRANSFER] Transfer %s connected from %s", deviceID, remoteIP) diff --git a/internal/services/hub.go b/internal/services/hub.go new file mode 100644 index 0000000..040e9e9 --- /dev/null +++ b/internal/services/hub.go @@ -0,0 +1,107 @@ +// SPDX-FileCopyrightText: 2026 ArcheBase +// +// SPDX-License-Identifier: MulanPSL-2.0 + +// Package services provides business logic services for Keystone Edge +package services + +import ( + "sync" + "time" + + "archebase.com/keystone-edge/internal/logger" + "github.com/coder/websocket" +) + +// Connection is implemented by every concrete WebSocket connection type +// managed by a Hub. It exposes the minimal identity and lifecycle surface +// needed by the generic Hub without leaking type-specific fields. +type Connection interface { + // GetDeviceID returns the unique device identifier for this connection. + GetDeviceID() string + // GetWSConn returns the underlying websocket connection. + GetWSConn() *websocket.Conn + // GetConnectedAt returns the time the connection was established. + GetConnectedAt() time.Time +} + +// Hub is a generic, concurrency-safe registry of WebSocket connections keyed +// by device ID. It handles the common Connect / Disconnect / Get / List +// operations so that concrete hub types (TransferHub, RecorderHub) only need +// to add their domain-specific behaviour. +// +// T must be a pointer type that implements Connection. +type Hub[T Connection] struct { + connections map[string]T + mu sync.RWMutex + label string // component label for log lines, e.g. "TRANSFER" +} + +// newHub allocates a Hub with the given log label. +func newHub[T Connection](label string) *Hub[T] { + return &Hub[T]{ + connections: make(map[string]T), + label: label, + } +} + +// connect registers conn under deviceID. If another connection is already +// registered for the same device, the new connection is rejected (caller must +// close it) and false is returned. Callers must pass a non-nil conn. +func (h *Hub[T]) connect(deviceID string, conn T) bool { + h.mu.Lock() + defer h.mu.Unlock() + + if old, exists := h.connections[deviceID]; exists { + if old.GetWSConn() != nil && old.GetWSConn() != conn.GetWSConn() { + logger.Printf("[%s] Hub: rejecting new connection for device %s (already connected)", h.label, deviceID) + return false + } + } + h.connections[deviceID] = conn + logger.Printf("[%s] Hub: device %s registered, total connections=%d", h.label, deviceID, len(h.connections)) + return true +} + +// disconnect removes the connection for deviceID only if it matches conn. +// This avoids a stale handler goroutine deleting a newer connection after +// rejecting takeover. Returns true if an entry was removed. +func (h *Hub[T]) disconnect(deviceID string, conn T) bool { + h.mu.Lock() + defer h.mu.Unlock() + + current, exists := h.connections[deviceID] + if !exists { + logger.Printf("[%s] Hub: Disconnect called for unknown device %s", h.label, deviceID) + return false + } + // Compare underlying websocket connections; type parameter T is not necessarily comparable. + cw := current.GetWSConn() + nw := conn.GetWSConn() + if cw == nil || nw == nil || cw != nw { + logger.Printf("[%s] Hub: Disconnect ignored for device %s (connection not current)", h.label, deviceID) + return false + } + delete(h.connections, deviceID) + logger.Printf("[%s] Hub: device %s disconnected", h.label, deviceID) + return true +} + +// get returns the connection for deviceID, or the zero value of T if not found. +func (h *Hub[T]) get(deviceID string) T { + h.mu.RLock() + defer h.mu.RUnlock() + return h.connections[deviceID] +} + +// list returns a snapshot of all current connections. +func (h *Hub[T]) list() []T { + h.mu.RLock() + defer h.mu.RUnlock() + + result := make([]T, 0, len(h.connections)) + for _, c := range h.connections { + result = append(result, c) + } + return result +} diff --git a/internal/services/recorder_hub.go b/internal/services/recorder_hub.go index d0059dc..52553b7 100644 --- a/internal/services/recorder_hub.go +++ b/internal/services/recorder_hub.go @@ -12,7 +12,6 @@ import ( "sync" "time" - "archebase.com/keystone-edge/internal/logger" "github.com/coder/websocket" "github.com/coder/websocket/wsjson" "github.com/google/uuid" @@ -81,6 +80,15 @@ type RecorderConn struct { StateMu sync.RWMutex } +// GetDeviceID implements Connection. +func (r *RecorderConn) GetDeviceID() string { return r.DeviceID } + +// GetWSConn implements Connection. +func (r *RecorderConn) GetWSConn() *websocket.Conn { return r.Conn } + +// GetConnectedAt implements Connection. +func (r *RecorderConn) GetConnectedAt() time.Time { return r.ConnectedAt } + // GetState returns a copy of the recorder state. func (r *RecorderConn) GetState() RecorderState { r.StateMu.RLock() @@ -97,15 +105,16 @@ func (r *RecorderConn) UpdateState(state RecorderState) { } // RecorderHub manages all active Axon Recorder WebSocket connections. +// It embeds the generic Hub[*RecorderConn] for connection lifecycle and +// adds the RPC request/response matching layer on top. type RecorderHub struct { - mu sync.RWMutex - connections map[string]*RecorderConn + *Hub[*RecorderConn] } // NewRecorderHub creates a new RecorderHub. func NewRecorderHub() *RecorderHub { return &RecorderHub{ - connections: make(map[string]*RecorderConn), + Hub: newHub[*RecorderConn]("RECORDER"), } } @@ -125,35 +134,23 @@ func (h *RecorderHub) NewRecorderConn(conn *websocket.Conn, deviceID, remoteIP s } } -// Connect registers a recorder connection, replacing any previous one for the same device. -func (h *RecorderHub) Connect(deviceID string, rc *RecorderConn) { - h.mu.Lock() - defer h.mu.Unlock() - if old, exists := h.connections[deviceID]; exists && old != nil && old.Conn != nil && old != rc { - logger.Printf("[RECORDER] RecorderHub: closing previous connection for device %s (replaced by new)", deviceID) - _ = old.Conn.Close(websocket.StatusPolicyViolation, "replaced by newer connection") - } - h.connections[deviceID] = rc - logger.Printf("[RECORDER] RecorderHub: device %s registered, total connections=%d", deviceID, len(h.connections)) +// Connect registers a recorder connection. If the device already has an active +// connection, returns false and the caller must close the new WebSocket. +func (h *RecorderHub) Connect(deviceID string, rc *RecorderConn) bool { + return h.connect(deviceID, rc) } -// Disconnect removes a recorder connection and closes pending waiters. -func (h *RecorderHub) Disconnect(deviceID string) { - h.mu.Lock() - rc := h.connections[deviceID] - delete(h.connections, deviceID) - h.mu.Unlock() - - if rc == nil { - logger.Printf("[RECORDER] RecorderHub: Disconnect called for unknown device %s", deviceID) +// Disconnect removes a recorder connection and drains any pending RPC waiters. +func (h *RecorderHub) Disconnect(deviceID string, rc *RecorderConn) { + if !h.disconnect(deviceID, rc) { return } - logger.Printf("[RECORDER] RecorderHub: device %s disconnected", deviceID) + // Unblock any goroutines waiting for an RPC response from this device. rc.PendingMu.Lock() for requestID, pending := range rc.Pending { delete(rc.Pending, requestID) - // Use non-blocking send to avoid panic if channel is already closed + // Non-blocking send: the waiter may have already timed out. select { case pending.ResponseC <- &RPCResponse{ RequestID: requestID, @@ -161,7 +158,6 @@ func (h *RecorderHub) Disconnect(deviceID string) { Message: ErrRecorderNotConnected.Error(), }: default: - // Channel already received or closed, skip } } rc.PendingMu.Unlock() @@ -169,17 +165,14 @@ func (h *RecorderHub) Disconnect(deviceID string) { // Get returns the recorder connection for a device, or nil if not connected. func (h *RecorderHub) Get(deviceID string) *RecorderConn { - h.mu.RLock() - defer h.mu.RUnlock() - return h.connections[deviceID] + return h.get(deviceID) } // ListDevices returns a snapshot of all connected recorders. func (h *RecorderHub) ListDevices() []RecorderInfo { - h.mu.RLock() - defer h.mu.RUnlock() - result := make([]RecorderInfo, 0, len(h.connections)) - for _, rc := range h.connections { + conns := h.list() + result := make([]RecorderInfo, 0, len(conns)) + for _, rc := range conns { result = append(result, RecorderInfo{ DeviceID: rc.DeviceID, RemoteIP: rc.RemoteIP, diff --git a/internal/services/task_manager.go b/internal/services/task_manager.go deleted file mode 100644 index 1283ab3..0000000 --- a/internal/services/task_manager.go +++ /dev/null @@ -1,109 +0,0 @@ -// SPDX-FileCopyrightText: 2026 ArcheBase -// -// SPDX-License-Identifier: MulanPSL-2.0 - -// Package services provides task management service -package services - -import ( - "sync" - - "archebase.com/keystone-edge/internal/logger" -) - -// TaskManager task manager -type TaskManager struct { - mu sync.RWMutex - tasks map[string]*Task -} - -// Task represents a task -type Task struct { - ID string - BatchID string - SceneID string - WorkstationID string - RobotID string - OperatorID string - Status string // pending, in_progress, completed, failed - Priority int -} - -// NewTaskManager creates a new task manager -func NewTaskManager() *TaskManager { - tm := &TaskManager{ - tasks: make(map[string]*Task), - } - logger.Println("[TASK] TaskManager initialized") - return tm -} - -// Create creates a new task -func (tm *TaskManager) Create(task *Task) error { - tm.mu.Lock() - defer tm.mu.Unlock() - - if _, exists := tm.tasks[task.ID]; exists { - return ErrTaskAlreadyExists - } - - tm.tasks[task.ID] = task - logger.Printf("[TASK] Created task: %s", task.ID) - return nil -} - -// Get retrieves a task by ID -func (tm *TaskManager) Get(id string) (*Task, error) { - tm.mu.RLock() - defer tm.mu.RUnlock() - - task, exists := tm.tasks[id] - if !exists { - return nil, ErrTaskNotFound - } - return task, nil -} - -// List returns all tasks -func (tm *TaskManager) List() []*Task { - tm.mu.RLock() - defer tm.mu.RUnlock() - - tasks := make([]*Task, 0, len(tm.tasks)) - for _, task := range tm.tasks { - tasks = append(tasks, task) - } - return tasks -} - -// UpdateStatus updates the status of a task -func (tm *TaskManager) UpdateStatus(id, status string) error { - tm.mu.Lock() - defer tm.mu.Unlock() - - task, exists := tm.tasks[id] - if !exists { - return ErrTaskNotFound - } - - oldStatus := task.Status - task.Status = status - logger.Printf("[TASK] Task %s status: %s -> %s", id, oldStatus, status) - return nil -} - -// Error definitions -var ( - ErrTaskAlreadyExists = &TaskError{Code: "TASK_EXISTS", Message: "task already exists"} - ErrTaskNotFound = &TaskError{Code: "TASK_NOT_FOUND", Message: "task not found"} -) - -// TaskError represents a task-related error -type TaskError struct { - Code string - Message string -} - -func (e *TaskError) Error() string { - return e.Message -} diff --git a/internal/services/task_manager_test.go b/internal/services/task_manager_test.go deleted file mode 100644 index c62b9e2..0000000 --- a/internal/services/task_manager_test.go +++ /dev/null @@ -1,249 +0,0 @@ -// SPDX-FileCopyrightText: 2026 ArcheBase -// -// SPDX-License-Identifier: MulanPSL-2.0 - -// services/task_manager_test.go - Task manager tests -package services - -import ( - "fmt" - "sync" - "testing" -) - -func TestNewTaskManager(t *testing.T) { - tm := NewTaskManager() - - if tm == nil { - t.Fatal("NewTaskManager() returned nil") - } - - if tm.tasks == nil { - t.Error("NewTaskManager().tasks not initialized") - } - - // Should be empty list - tasks := tm.List() - if len(tasks) != 0 { - t.Errorf("NewTaskManager().List() = %v, want empty", tasks) - } -} - -func TestTaskManagerCreate(t *testing.T) { - tm := NewTaskManager() - - task := &Task{ - ID: "task-001", - BatchID: "batch-001", - SceneID: "scene-001", - WorkstationID: "ws-001", - RobotID: "robot-001", - OperatorID: "operator-001", - Status: "pending", - Priority: 1, - } - - err := tm.Create(task) - if err != nil { - t.Fatalf("Create() error = %v", err) - } - - // Verify task was created - got, err := tm.Get("task-001") - if err != nil { - t.Fatalf("Get() error = %v", err) - } - - if got.ID != task.ID { - t.Errorf("Get().ID = %v, want %v", got.ID, task.ID) - } - - if got.Status != task.Status { - t.Errorf("Get().Status = %v, want %v", got.Status, task.Status) - } -} - -func TestTaskManagerCreateDuplicate(t *testing.T) { - tm := NewTaskManager() - - task := &Task{ - ID: "task-001", - BatchID: "batch-001", - SceneID: "scene-001", - RobotID: "robot-001", - Status: "pending", - } - - // First create should succeed - err := tm.Create(task) - if err != nil { - t.Fatalf("First Create() error = %v", err) - } - - // Second create should fail - err = tm.Create(task) - if err != ErrTaskAlreadyExists { - t.Errorf("Duplicate Create() error = %v, want %v", err, ErrTaskAlreadyExists) - } -} - -func TestTaskManagerGet(t *testing.T) { - tm := NewTaskManager() - - // Get non-existent task - _, err := tm.Get("nonexistent") - if err != ErrTaskNotFound { - t.Errorf("Get() non-existent task error = %v, want %v", err, ErrTaskNotFound) - } - - // Create task then get - task := &Task{ - ID: "task-001", - BatchID: "batch-001", - SceneID: "scene-001", - RobotID: "robot-001", - Status: "pending", - } - tm.Create(task) - - got, err := tm.Get("task-001") - if err != nil { - t.Fatalf("Get() error = %v", err) - } - - if got.ID != "task-001" { - t.Errorf("Get().ID = %v, want task-001", got.ID) - } -} - -func TestTaskManagerList(t *testing.T) { - tm := NewTaskManager() - - // Empty list - tasks := tm.List() - if len(tasks) != 0 { - t.Errorf("List() = %v, want empty", tasks) - } - - // Add a few tasks - for i := 1; i <= 3; i++ { - task := &Task{ - ID: fmt.Sprintf("task-%03d", i), - BatchID: "batch-001", - SceneID: "scene-001", - RobotID: "robot-001", - Status: "pending", - } - tm.Create(task) - } - - tasks = tm.List() - if len(tasks) != 3 { - t.Errorf("List() length = %v, want 3", len(tasks)) - } -} - -func TestTaskManagerUpdateStatus(t *testing.T) { - tm := NewTaskManager() - - // Create task - task := &Task{ - ID: "task-001", - BatchID: "batch-001", - SceneID: "scene-001", - RobotID: "robot-001", - Status: "pending", - } - tm.Create(task) - - // Update status - err := tm.UpdateStatus("task-001", "in_progress") - if err != nil { - t.Fatalf("UpdateStatus() error = %v", err) - } - - // Verify status updated - got, _ := tm.Get("task-001") - if got.Status != "in_progress" { - t.Errorf("UpdateStatus() then Get().Status = %v, want in_progress", got.Status) - } - - // Continue update to completed - tm.UpdateStatus("task-001", "completed") - got, _ = tm.Get("task-001") - if got.Status != "completed" { - t.Errorf("UpdateStatus() then Get().Status = %v, want completed", got.Status) - } -} - -func TestTaskManagerUpdateStatusNotFound(t *testing.T) { - tm := NewTaskManager() - - err := tm.UpdateStatus("nonexistent", "in_progress") - if err != ErrTaskNotFound { - t.Errorf("UpdateStatus() non-existent task error = %v, want %v", err, ErrTaskNotFound) - } -} - -func TestTaskManagerConcurrent(t *testing.T) { - tm := NewTaskManager() - const goroutines = 50 - const tasksPerGoroutine = 10 - - var wg sync.WaitGroup - wg.Add(goroutines) - - // Concurrent create tasks - for i := 0; i < goroutines; i++ { - go func(idx int) { - defer wg.Done() - for j := 0; j < tasksPerGoroutine; j++ { - taskID := fmt.Sprintf("task-%d-%d", idx, j) - task := &Task{ - ID: taskID, - BatchID: "batch-001", - SceneID: "scene-001", - RobotID: "robot-001", - Status: "pending", - } - tm.Create(task) - } - }(i) - } - - wg.Wait() - - // Verify task count - tasks := tm.List() - expectedTasks := goroutines * tasksPerGoroutine - if len(tasks) != expectedTasks { - t.Errorf("Concurrent create List() length = %v, want %v", len(tasks), expectedTasks) - } -} - -func TestTaskError(t *testing.T) { - tests := []struct { - name string - err *TaskError - wantMsg string - }{ - { - name: "Task already exists error", - err: ErrTaskAlreadyExists, - wantMsg: "task already exists", - }, - { - name: "Task not found error", - err: ErrTaskNotFound, - wantMsg: "task not found", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if tt.err.Error() != tt.wantMsg { - t.Errorf("TaskError.Error() = %v, want %v", tt.err.Error(), tt.wantMsg) - } - }) - } -} diff --git a/internal/services/transfer_hub.go b/internal/services/transfer_hub.go index e0de671..fbe3a7d 100644 --- a/internal/services/transfer_hub.go +++ b/internal/services/transfer_hub.go @@ -11,7 +11,6 @@ import ( "sync" "time" - "archebase.com/keystone-edge/internal/logger" "github.com/coder/websocket" "github.com/coder/websocket/wsjson" ) @@ -108,6 +107,15 @@ type TransferConn struct { StatusMu sync.RWMutex } +// GetDeviceID implements Connection. +func (d *TransferConn) GetDeviceID() string { return d.DeviceID } + +// GetWSConn implements Connection. +func (d *TransferConn) GetWSConn() *websocket.Conn { return d.Conn } + +// GetConnectedAt implements Connection. +func (d *TransferConn) GetConnectedAt() time.Time { return d.ConnectedAt } + // RecordEvent appends an event to the device's ring buffer func (d *TransferConn) RecordEvent(direction string, payload map[string]interface{}) { d.events.Push(DeviceEvent{ @@ -137,17 +145,27 @@ func (d *TransferConn) GetStatus() DeviceStatus { return d.Status } -// TransferHub manages all active WebSocket device connections +// DeviceInfo is a read-only snapshot of a connected device +type DeviceInfo struct { + DeviceID string `json:"device_id"` + RemoteIP string `json:"remote_ip"` + ConnectedAt time.Time `json:"connected_at"` + LastSeenAt time.Time `json:"last_seen_at"` + Status DeviceStatus `json:"status"` +} + +// TransferHub manages all active WebSocket device connections. +// It embeds the generic Hub[*TransferConn] to handle all concurrency and +// bookkeeping, and only adds Transfer-specific behaviour on top. type TransferHub struct { - connections map[string]*TransferConn - mu sync.RWMutex + *Hub[*TransferConn] maxEventsPerDev int } // NewTransferHub creates a new TransferHub func NewTransferHub(maxEventsPerDevice int) *TransferHub { return &TransferHub{ - connections: make(map[string]*TransferConn), + Hub: newHub[*TransferConn]("TRANSFER"), maxEventsPerDev: maxEventsPerDevice, } } @@ -164,45 +182,29 @@ func (h *TransferHub) NewTransferConn(conn *websocket.Conn, deviceID, remoteIP s } } -// Connect registers a device connection -func (h *TransferHub) Connect(deviceID string, dc *TransferConn) { - h.mu.Lock() - defer h.mu.Unlock() - if old, exists := h.connections[deviceID]; exists && old != nil && old.Conn != nil && old != dc { - logger.Printf("[TRANSFER] TransferHub: closing previous connection for device %s (replaced by new)", deviceID) - _ = old.Conn.Close(websocket.StatusPolicyViolation, "replaced by newer connection") - } - h.connections[deviceID] = dc - logger.Printf("[TRANSFER] TransferHub: device %s registered, total connections=%d", deviceID, len(h.connections)) +// Connect registers a device connection. If the device already has an active +// connection, returns false and the caller must close the new WebSocket. +func (h *TransferHub) Connect(deviceID string, dc *TransferConn) bool { + return h.connect(deviceID, dc) } // Disconnect removes a device connection -func (h *TransferHub) Disconnect(deviceID string) { - h.mu.Lock() - dc := h.connections[deviceID] - delete(h.connections, deviceID) - h.mu.Unlock() - - if dc == nil { - logger.Printf("[TRANSFER] TransferHub: Disconnect called for unknown device %s", deviceID) +func (h *TransferHub) Disconnect(deviceID string, dc *TransferConn) { + if !h.disconnect(deviceID, dc) { return } - logger.Printf("[TRANSFER] TransferHub: device %s disconnected", deviceID) } // Get returns the TransferConn for a device, or nil if not connected func (h *TransferHub) Get(deviceID string) *TransferConn { - h.mu.RLock() - defer h.mu.RUnlock() - return h.connections[deviceID] + return h.get(deviceID) } // ListDevices returns a snapshot of all connected device IDs and their metadata func (h *TransferHub) ListDevices() []DeviceInfo { - h.mu.RLock() - defer h.mu.RUnlock() - result := make([]DeviceInfo, 0, len(h.connections)) - for _, dc := range h.connections { + conns := h.list() + result := make([]DeviceInfo, 0, len(conns)) + for _, dc := range conns { result = append(result, DeviceInfo{ DeviceID: dc.DeviceID, RemoteIP: dc.RemoteIP, @@ -214,15 +216,6 @@ func (h *TransferHub) ListDevices() []DeviceInfo { return result } -// DeviceInfo is a read-only snapshot of a connected device -type DeviceInfo struct { - DeviceID string `json:"device_id"` - RemoteIP string `json:"remote_ip"` - ConnectedAt time.Time `json:"connected_at"` - LastSeenAt time.Time `json:"last_seen_at"` - Status DeviceStatus `json:"status"` -} - // SendToDevice sends a JSON message to a connected device via WebSocket func (h *TransferHub) SendToDevice(ctx context.Context, deviceID string, msg map[string]interface{}) error { dc := h.Get(deviceID) diff --git a/pkg/models/models.go b/pkg/models/models.go deleted file mode 100644 index cc5814b..0000000 --- a/pkg/models/models.go +++ /dev/null @@ -1,76 +0,0 @@ -// SPDX-FileCopyrightText: 2026 ArcheBase -// -// SPDX-License-Identifier: MulanPSL-2.0 - -// Package models provides data model definitions -package models - -import "time" - -// Task 任务模型 -type Task struct { - ID string `json:"id" db:"id"` - BatchID string `json:"batch_id" db:"batch_id"` - SceneID string `json:"scene_id" db:"scene_id"` - WorkstationID string `json:"workstation_id" db:"workstation_id"` - RobotID string `json:"robot_id" db:"robot_id"` - OperatorID string `json:"operator_id" db:"operator_id"` - Status string `json:"status" db:"status"` // pending, in_progress, completed, failed - Priority int `json:"priority" db:"priority"` - CreatedAt time.Time `json:"created_at" db:"created_at"` - UpdatedAt time.Time `json:"updated_at" db:"updated_at"` -} - -// Episode Episode 模型 -type Episode struct { - ID string `json:"id" db:"id"` - TaskID string `json:"task_id" db:"task_id"` - BatchID string `json:"batch_id" db:"batch_id"` - SceneID string `json:"scene_id" db:"scene_id"` - RobotID string `json:"robot_id" db:"robot_id"` - StartTime time.Time `json:"start_time" db:"start_time"` - EndTime time.Time `json:"end_time" db:"end_time"` - FileSizeBytes int64 `json:"file_size_bytes" db:"file_size_bytes"` - FilePath string `json:"file_path" db:"file_path"` // S3 路径 - SidecarPath string `json:"sidecar_path" db:"sidecar_path"` // 元数据路径 - QAStatus string `json:"qa_status" db:"qa_status"` // pending, approved, rejected - QAScore float64 `json:"qa_score" db:"qa_score"` - QAComment string `json:"qa_comment" db:"qa_comment"` - CloudSynced bool `json:"cloud_synced" db:"cloud_synced"` - CreatedAt time.Time `json:"created_at" db:"created_at"` - UpdatedAt time.Time `json:"updated_at" db:"updated_at"` -} - -// Workstation 工作站模型 -type Workstation struct { - ID string `json:"id" db:"id"` - Name string `json:"name" db:"name"` - Location string `json:"location" db:"location"` - Status string `json:"status" db:"status"` // online, offline, maintenance - CurrentRobotID string `json:"current_robot_id" db:"current_robot_id"` - CreatedAt time.Time `json:"created_at" db:"created_at"` - UpdatedAt time.Time `json:"updated_at" db:"updated_at"` -} - -// Scene 场景模型 -type Scene struct { - ID string `json:"id" db:"id"` - Name string `json:"name" db:"name"` - Description string `json:"description" db:"description"` - Version int `json:"version" db:"version"` - Active bool `json:"active" db:"active"` - SyncedAt time.Time `json:"synced_at" db:"synced_at"` -} - -// HealthResponse 健康检查响应 -type HealthResponse struct { - Status string `json:"status"` - Timestamp string `json:"timestamp"` - Components map[string]ComponentHealth `json:"components"` -} - -// ComponentHealth 组件健康状态 -type ComponentHealth struct { - Status string `json:"status"` - Message string `json:"message,omitempty"` -}