Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ release/
# Claude Code
.claude/settings.local.json

# Agent guides (local only)
AGENTS.md

# swagger
docs/docs.go
docs/swagger.json
Expand Down
8 changes: 4 additions & 4 deletions ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,16 @@ First Release Implementation Order:
Second Release Implementation Order:

1. **Authentication & Authorization**
- JWT authentication
- JWT authentication
- Role-based access control (RBAC)
- API private key management
2. **Scene & Skill Management**
- ✅ scene and subscene CRUD
- ✅ skill and sop CRUD
3. **Order & Task Management**
- order CRUD
- batch CRUD
- task dispatch
- order CRUD
- batch CRUD
- task dispatch lifecycle
4. **upload queue**
- Upload prioritization and bandwidth throttling
5. **QA Pipeline**
Expand Down
19 changes: 13 additions & 6 deletions internal/api/handlers/axon_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,28 @@ func (h *RecorderHandler) HandleWebSocket(w http.ResponseWriter, r *http.Request
logger.Printf("[RECORDER] Device %s: WebSocket accept error: %v", deviceID, err)
return
}

remoteIP := extractIP(r.RemoteAddr)
rc := h.hub.NewRecorderConn(conn, deviceID, remoteIP)
if !h.hub.Connect(deviceID, rc) {
logger.Printf("[RECORDER] Device %s: connection rejected (already connected)", deviceID)
if err := conn.Close(websocket.StatusPolicyViolation, "device already connected"); err != nil {
logger.Printf("[RECORDER] Device %s: WebSocket close error: %v", deviceID, err)
}
return
}

defer func() {
if err := conn.Close(websocket.StatusNormalClosure, ""); err != nil {
logger.Printf("[RECORDER] Device %s: WebSocket close error: %v", deviceID, err)
}
}()
defer h.hub.Disconnect(deviceID, rc)
defer revertRunnableTasksOnDeviceDisconnect(h.db, deviceID, nil, 0, false)

ctx := r.Context()
go h.pingLoop(ctx, conn)

remoteIP := extractIP(r.RemoteAddr)
rc := h.hub.NewRecorderConn(conn, deviceID, remoteIP)
h.hub.Connect(deviceID, rc)
defer h.hub.Disconnect(deviceID)
defer revertRunnableTasksOnDeviceDisconnect(h.db, deviceID, nil, 0, false)

// #nosec G706 -- Set aside for now
logger.Printf("[RECORDER] Recorder %s connected from %s", deviceID, remoteIP)

Expand Down
45 changes: 36 additions & 9 deletions internal/api/handlers/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@ type BatchListItem struct {

// ListBatchesResponse represents the response body for listing batches.
type ListBatchesResponse struct {
Batches []BatchListItem `json:"batches"`
Items []BatchListItem `json:"items"`
Total int `json:"total"`
Limit int `json:"limit"`
Offset int `json:"offset"`
HasNext bool `json:"hasNext,omitempty"`
HasPrev bool `json:"hasPrev,omitempty"`
}

type batchRow struct {
Expand All @@ -145,8 +147,8 @@ type batchRow struct {
StartedAt sql.NullTime `db:"started_at"`
EndedAt sql.NullTime `db:"ended_at"`
Metadata sql.NullString `db:"metadata"`
CreatedAt sql.NullString `db:"created_at"`
UpdatedAt sql.NullString `db:"updated_at"`
CreatedAt sql.NullTime `db:"created_at"`
UpdatedAt sql.NullTime `db:"updated_at"`
}

func parseNullableJSON(v sql.NullString) any {
Expand Down Expand Up @@ -182,11 +184,11 @@ func batchListItemFromRow(r batchRow) BatchListItem {

createdAt := ""
if r.CreatedAt.Valid {
createdAt = r.CreatedAt.String
createdAt = r.CreatedAt.Time.UTC().Format(time.RFC3339)
}
updatedAt := ""
if r.UpdatedAt.Valid {
updatedAt = r.UpdatedAt.String
updatedAt = r.UpdatedAt.Time.UTC().Format(time.RFC3339)
}

nameOut := ""
Expand Down Expand Up @@ -339,11 +341,16 @@ func (h *BatchHandler) ListBatches(c *gin.Context) {
batches = append(batches, batchListItemFromRow(r))
}

hasNext := (offset + limit) < total
hasPrev := offset > 0

c.JSON(http.StatusOK, ListBatchesResponse{
Batches: batches,
Items: batches,
Total: total,
Limit: limit,
Offset: offset,
HasNext: hasNext,
HasPrev: hasPrev,
})
}

Expand Down Expand Up @@ -1442,6 +1449,10 @@ func (h *BatchHandler) AdjustBatchTasks(c *gin.Context) {
return
}

// If task deletions/inserts made the batch terminal, advance status accordingly.
// Example: target reduced from 2 -> 1 after 1 task completed; remaining tasks may all be terminal.
tryAdvanceBatchStatus(h.db, batchNumID)

c.JSON(http.StatusOK, AdjustBatchTasksResponse{
CreatedTasks: createdTasks,
DeletedTaskIDs: deletedTaskIDs,
Expand Down Expand Up @@ -1650,7 +1661,7 @@ func (h *BatchHandler) ListBatchTasks(c *gin.Context) {
}

c.JSON(http.StatusOK, ListTasksResponse{
Tasks: items,
Items: items,
Total: len(items),
Limit: len(items),
Offset: 0,
Expand Down Expand Up @@ -1815,8 +1826,24 @@ func tryAdvanceBatchStatus(db *sqlx.DB, batchID int64) {

switch info.Status {
case "pending":
// Advance to active: a task just reached a terminal state, so this batch has started.
// Then immediately re-evaluate completion: it's possible all tasks are already terminal.
// Advance to active only if at least one task is already in terminal state.
// Historically this function was only called after a task reached terminal state,
// but it may also be invoked after batch task adjustments (create/delete), so we
// must guard against incorrectly advancing a never-started batch.
var terminalCount int
if err := tx.Get(&terminalCount, `
SELECT COUNT(*) FROM tasks
WHERE batch_id = ? AND deleted_at IS NULL
AND status IN ('completed', 'failed', 'cancelled')`,
batchID); err != nil {
logger.Printf("[BATCH] tryAdvanceBatchStatus: failed to count terminal tasks for batch %d: %v", batchID, err)
return
}
if terminalCount == 0 {
return
}

// Now it's safe to mark started; then re-evaluate completion.
if _, err := tx.Exec(
"UPDATE batches SET status = 'active', started_at = ?, updated_at = ? WHERE id = ? AND status = 'pending' AND deleted_at IS NULL",
now, now, batchID,
Expand Down
96 changes: 66 additions & 30 deletions internal/api/handlers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,80 @@ package handlers
import (
"database/sql"
"encoding/json"
"net/http"
"strconv"
"strings"
"time"
"unicode"
"unicode/utf8"

"github.com/gin-gonic/gin"
)

// maxSlugLength matches VARCHAR(100) for slug columns in the schema.
const maxSlugLength = 100
const (
defaultLimit = 50
maxLimit = 100
maxSlugLength = 100
)

// invalidSlugUserMessage is returned when slug fails isValidSlug (length or charset).
const invalidSlugUserMessage = "slug must be at most 100 characters and contain only letters, digits, and hyphens"

// PaginationParams represents normalized pagination input from query parameters.
type PaginationParams struct {
Limit int
Offset int
}

// ListResponse is a generic list payload with pagination metadata.
type ListResponse struct {
Items interface{} `json:"items"`
Total int `json:"total"`
Limit int `json:"limit"`
Offset int `json:"offset"`
HasNext bool `json:"hasNext,omitempty"`
HasPrev bool `json:"hasPrev,omitempty"`
}

// ParsePagination reads and validates pagination query parameters from the request.
func ParsePagination(c *gin.Context) (PaginationParams, error) {
limit := defaultLimit
offset := 0

if rawLimit := strings.TrimSpace(c.Query("limit")); rawLimit != "" {
parsedLimit, err := strconv.Atoi(rawLimit)
if err != nil || parsedLimit < 1 {
return PaginationParams{}, &PaginationError{Message: "limit must be a positive integer"}
}
if parsedLimit > maxLimit {
parsedLimit = maxLimit
}
limit = parsedLimit
}

if rawOffset := strings.TrimSpace(c.Query("offset")); rawOffset != "" {
parsedOffset, err := strconv.Atoi(rawOffset)
if err != nil || parsedOffset < 0 {
return PaginationParams{}, &PaginationError{Message: "offset must be a non-negative integer"}
}
offset = parsedOffset
}

return PaginationParams{Limit: limit, Offset: offset}, nil
}

// PaginationError is returned when pagination inputs are invalid.
type PaginationError struct {
Message string
}

func (e *PaginationError) Error() string {
return e.Message
}

// PaginationErrorResponse writes a standardized 400 response for pagination errors.
func PaginationErrorResponse(c *gin.Context, err error) {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
}

// isValidSlug checks non-empty slug, length <= maxSlugLength (in runes), and allows Unicode letters/digits plus hyphen.
func isValidSlug(s string) bool {
if s == "" {
Expand Down Expand Up @@ -80,29 +142,3 @@ func sqlNullJSONFromRaw(raw json.RawMessage) sql.NullString {
}
return sql.NullString{String: s, Valid: true}
}

func formatDBTimeToRFC3339(raw string) string {
s := strings.TrimSpace(raw)
if s == "" {
return ""
}

// MySQL commonly returns "YYYY-MM-DD HH:MM:SS" or with fractional seconds.
// Some drivers/configs may return RFC3339.
layouts := []string{
"2006-01-02 15:04:05",
"2006-01-02 15:04:05.999999",
"2006-01-02 15:04:05.999999999",
time.RFC3339Nano,
time.RFC3339,
}

for _, layout := range layouts {
if t, err := time.Parse(layout, s); err == nil {
return t.Format(time.RFC3339)
}
}

// Fallback: return original string instead of a wrong timestamp.
return s
}
Loading
Loading