From c8eb7a4026e48fefc7b7e7edc0bed94b4828d2d3 Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Thu, 2 Apr 2026 09:48:26 -0500 Subject: [PATCH] core/capabilities: fix race for subServices --- core/capabilities/launcher.go | 69 ++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 26 deletions(-) diff --git a/core/capabilities/launcher.go b/core/capabilities/launcher.go index 32bda8c9e08..1f15ddd8087 100644 --- a/core/capabilities/launcher.go +++ b/core/capabilities/launcher.go @@ -6,6 +6,7 @@ import ( "fmt" "slices" "strings" + "sync" "time" "github.com/Masterminds/semver/v3" @@ -52,12 +53,14 @@ type launcher struct { dispatcher remotetypes.Dispatcher cachedShims cachedShims registry *Registry - subServices []services.Service workflowDonNotifier DonNotifier don2donSharedPeer p2ptypes.SharedPeer p2pStreamConfig p2ptypes.StreamConfig metrics *launcherMetrics localCapMgr localcapmgr.LocalCapabilityManager + + muSubServices sync.Mutex + subServices []services.Service } // For V2 capabilities, shims are created once and their config is updated dynamically. @@ -117,7 +120,6 @@ func NewLauncher( executableServers: make(map[string]executable.Server), }, registry: registry, - subServices: []services.Service{}, workflowDonNotifier: workflowDonNotifier, don2donSharedPeer: don2donSharedPeer, p2pStreamConfig: p2pStreamConfig, @@ -223,27 +225,31 @@ func (w *launcher) allDONs(localRegistry *registrysyncer.LocalRegistry) []regist } func (w *launcher) Start(ctx context.Context) error { - if w.peerWrapper != nil && w.peerWrapper.GetPeer() != nil { - w.myPeerID = w.peerWrapper.GetPeer().ID() - return nil - } - if w.don2donSharedPeer != nil { - w.myPeerID = w.don2donSharedPeer.ID() - return nil - } - return errors.New("could not get peer ID from any source") + return w.StartOnce("CapabilitiesLauncher", func() error { + if w.peerWrapper != nil && w.peerWrapper.GetPeer() != nil { + w.myPeerID = w.peerWrapper.GetPeer().ID() + return nil + } + if w.don2donSharedPeer != nil { + w.myPeerID = w.don2donSharedPeer.ID() + return nil + } + return errors.New("could not get peer ID from any source") + }) } func (w *launcher) Close() error { - for _, s := range w.subServices { - if err := s.Close(); err != nil { - w.lggr.Errorw("failed to close a sub-service", "name", s.Name(), "error", err) + return w.StopOnce("CapabilitiesLauncher", func() error { + for _, s := range w.subServices { + if err := s.Close(); err != nil { + w.lggr.Errorw("failed to close a sub-service", "name", s.Name(), "error", err) + } } - } - if w.peerWrapper != nil { - return w.peerWrapper.GetPeer().UpdateConnections(map[ragetypes.PeerID]p2ptypes.StreamConfig{}) - } - return nil + if w.peerWrapper != nil { + return w.peerWrapper.GetPeer().UpdateConnections(map[ragetypes.PeerID]p2ptypes.StreamConfig{}) + } + return nil + }) } // LocalCapabilityManager is initialized after the Launcher is created @@ -251,12 +257,8 @@ func (w *launcher) SetLocalCapabilityManager(lcm localcapmgr.LocalCapabilityMana w.localCapMgr = lcm } -func (w *launcher) Ready() error { - return nil -} - func (w *launcher) HealthReport() map[string]error { - return nil + return map[string]error{w.Name(): w.Healthy()} } func (w *launcher) Name() string { @@ -299,7 +301,16 @@ func (w *launcher) donPairsToUpdate(myID ragetypes.PeerID, localRegistry *regist return donPairs } -func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyncer.LocalRegistry) error { +func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyncer.LocalRegistry) (err error) { + if !w.IfNotStopped(func() { + err = w.onNewRegistry(ctx, localRegistry) + }) { + return errors.New("service has been stopped") + } + return +} + +func (w *launcher) onNewRegistry(ctx context.Context, localRegistry *registrysyncer.LocalRegistry) error { w.lggr.Debug("CapabilitiesLauncher triggered...") w.registry.SetLocalRegistry(localRegistry) @@ -672,6 +683,8 @@ func (w *launcher) addToRegistryAndSetDispatcher(ctx context.Context, capability if err != nil { return fmt.Errorf("failed to start capability: %w", err) } + w.muSubServices.Lock() + defer w.muSubServices.Unlock() w.subServices = append(w.subServices, cp) return nil } @@ -893,6 +906,8 @@ func (w *launcher) addReceiver(ctx context.Context, capability registrysyncer.Ca return fmt.Errorf("failed to start receiver: %w", err) } + w.muSubServices.Lock() + defer w.muSubServices.Unlock() w.subServices = append(w.subServices, receiver) return nil } @@ -1004,8 +1019,10 @@ func (w *launcher) startNewShim(ctx context.Context, receiver remotetypes.Receiv _ = receiver.Close() return fmt.Errorf("failed to register receiver for capability %s, method %s: %w", capID, method, err) } - w.subServices = append(w.subServices, receiver) w.lggr.Debugw("New remote shim started successfully for capability method", "id", capID, "method", method, "donID", donID) + w.muSubServices.Lock() + defer w.muSubServices.Unlock() + w.subServices = append(w.subServices, receiver) return nil }