Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions internal/server/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ import (

// ClientRegistry manages connected WebSocket clients thread-safely
type ClientRegistry struct {
clients map[*websocket.Conn]bool
clients map[*websocket.Conn]*sync.Mutex
mu sync.RWMutex
}

// NewClientRegistry creates a new client registry
func NewClientRegistry() *ClientRegistry {
return &ClientRegistry{
clients: make(map[*websocket.Conn]bool),
clients: make(map[*websocket.Conn]*sync.Mutex),
}
}

// Add registers a new client connection
func (r *ClientRegistry) Add(conn *websocket.Conn) {
r.mu.Lock()
defer r.mu.Unlock()
r.clients[conn] = true
r.clients[conn] = &sync.Mutex{}
}

// Remove unregisters a client connection
Expand All @@ -43,6 +43,14 @@ func (r *ClientRegistry) Count() int {

// Contains checks if a client is registered
func (r *ClientRegistry) Contains(conn *websocket.Conn) bool {
r.mu.RLock()
defer r.mu.RUnlock()
_, ok := r.clients[conn]
return ok
}

// GetMutex returns the mutex for a given connection, if it exists
func (r *ClientRegistry) GetMutex(conn *websocket.Conn) *sync.Mutex {
r.mu.RLock()
defer r.mu.RUnlock()
return r.clients[conn]
Expand All @@ -51,19 +59,28 @@ func (r *ClientRegistry) Contains(conn *websocket.Conn) bool {
// ForEach executes a function for each connected client
func (r *ClientRegistry) ForEach(fn func(*websocket.Conn)) {
r.mu.RLock()
defer r.mu.RUnlock()

// Copy keys to avoid holding lock during iteration
conns := make([]*websocket.Conn, 0, len(r.clients))
for conn := range r.clients {
conns = append(conns, conn)
}
r.mu.RUnlock()

for _, conn := range conns {
fn(conn)
}
}

// Broadcast sends a message to all connected clients
func (r *ClientRegistry) Broadcast(fn func(*websocket.Conn) error) {
r.mu.RLock()
defer r.mu.RUnlock()

conns := make([]*websocket.Conn, 0, len(r.clients))
for conn := range r.clients {
conns = append(conns, conn)
}
r.mu.RUnlock()

for _, conn := range conns {
_ = fn(conn)
}
}
9 changes: 9 additions & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,15 @@ func (s *Server) NotifyClient(conn *websocket.Conn, response Response) error {
return nil
}

mu := s.clients.GetMutex(conn)
if mu == nil {
// Client disconnected or not registered
return nil
}

mu.Lock()
defer mu.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

Expand Down
12 changes: 7 additions & 5 deletions internal/worker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ func (w *Worker) processJob(job *server.PrintJob) {
err := w.executePrint(job)

duration := time.Since(startTime)
w.lastJobTime = time.Now()

// Update statistics
w.mu.Lock()
w.lastJobTime = time.Now()
if err != nil {
w.jobsFailed++
} else {
Expand Down Expand Up @@ -154,11 +154,13 @@ func (w *Worker) processJob(job *server.PrintJob) {
}
}

// Notify client
// Notify client (async to not block worker loop)
if job.ClientConn != nil && w.notifier != nil {
if err := w.notifier.NotifyClient(job.ClientConn, response); err != nil {
log.Printf("[WORKER] ⚠️ Failed to notify client for job %s: %v", job.ID, err)
}
go func() {
if err := w.notifier.NotifyClient(job.ClientConn, response); err != nil {
log.Printf("[WORKER] ⚠️ Failed to notify client for job %s: %v", job.ID, err)
}
}()
Comment on lines +157 to +163
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spawning a goroutine per job for notifications means Worker.Stop() no longer waits for in-flight notifications. During shutdown this can leave background goroutines running while the WebSocket server is being closed, and notifications may continue up to the 5s timeout in NotifyClient. Consider tracking notification goroutines with a WaitGroup (and waiting in Stop), or sending notifications via a bounded queue/worker to avoid leaks during shutdown.

Copilot uses AI. Check for mistakes.
}
}

Expand Down
76 changes: 76 additions & 0 deletions internal/worker/processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package worker

import (
"encoding/json"
"testing"
"time"

"github.com/adcondev/ticket-daemon/internal/server"
"github.com/coder/websocket"
)

// mockSlowNotifier simulates a slow network connection
type mockSlowNotifier struct {
delay time.Duration
}

func (m *mockSlowNotifier) NotifyClient(_ *websocket.Conn, _ server.Response) error {
time.Sleep(m.delay)
return nil
}

func TestWorkerBlockingNotification(t *testing.T) {
// Setup
jobCount := 5
notifier := &mockSlowNotifier{delay: 200 * time.Millisecond} // 200ms delay per notification

// Create job queue
jobQueue := make(chan *server.PrintJob, jobCount)

// Create worker
config := Config{DefaultPrinter: "test"}
w := NewWorker(jobQueue, notifier, config)

// Start worker
w.Start()
defer w.Stop()

// Create dummy connection (we need a non-nil pointer)
dummyConn := &websocket.Conn{}

// Prepare jobs
for j := 0; j < jobCount; j++ {
job := &server.PrintJob{
ID: "test-job",
ClientConn: dummyConn,
Document: json.RawMessage("{}"), // Invalid document to trigger failure but still notify
ReceivedAt: time.Now(),
}
jobQueue <- job
}

start := time.Now()

// Wait for processing
deadline := time.Now().Add(5 * time.Second)
for {
stats := w.Stats()
if stats.JobsProcessed+stats.JobsFailed >= int64(jobCount) {
break
}
if time.Now().After(deadline) {
t.Fatalf("Timeout waiting for jobs to process. Processed: %d, Failed: %d", stats.JobsProcessed, stats.JobsFailed)
}
time.Sleep(10 * time.Millisecond)
}

duration := time.Since(start)

// With blocking notification: 5 jobs * 200ms = 1000ms (1s)
// We expect it to take at least 1s.
if duration > 500*time.Millisecond {
t.Errorf("Expected duration < 500ms (async), got %v", duration)
Comment on lines +52 to +72
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test relies on wall-clock timing ("duration < 500ms") plus sleep/polling, which can be flaky on loaded/slow CI runners. Consider a deterministic approach: have the notifier block on a channel, assert that the worker reports all jobs processed/failed before releasing the notifier, and then optionally add a generous upper bound as a secondary guardrail.

Copilot uses AI. Check for mistakes.
} else {
t.Logf("Blocking verified: Duration %v for %d jobs", duration, jobCount)
}
Comment on lines +22 to +75
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test name/messages/comments are inconsistent with the assertion: it expects async behavior (duration < 500ms) but is named "TestWorkerBlockingNotification", comments say it should take ~1s, and the log says "Blocking verified" on the fast path. Please rename/update the comments/logs to reflect that the goal is verifying non-blocking/asynchronous notification behavior.

Copilot uses AI. Check for mistakes.
}
Loading