Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 43 additions & 26 deletions core/capabilities/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"slices"
"strings"
"sync"
"time"

"github.com/Masterminds/semver/v3"
Expand Down Expand Up @@ -52,12 +53,14 @@ type launcher struct {
dispatcher remotetypes.Dispatcher
cachedShims cachedShims
registry *Registry
subServices []services.Service
workflowDonNotifier DonNotifier
don2donSharedPeer p2ptypes.SharedPeer
p2pStreamConfig p2ptypes.StreamConfig
metrics *launcherMetrics
localCapMgr localcapmgr.LocalCapabilityManager

muSubServices sync.Mutex
subServices []services.Service
}

// For V2 capabilities, shims are created once and their config is updated dynamically.
Expand Down Expand Up @@ -117,7 +120,6 @@ func NewLauncher(
executableServers: make(map[string]executable.Server),
},
registry: registry,
subServices: []services.Service{},
workflowDonNotifier: workflowDonNotifier,
don2donSharedPeer: don2donSharedPeer,
p2pStreamConfig: p2pStreamConfig,
Expand Down Expand Up @@ -223,40 +225,40 @@ func (w *launcher) allDONs(localRegistry *registrysyncer.LocalRegistry) []regist
}

func (w *launcher) Start(ctx context.Context) error {
if w.peerWrapper != nil && w.peerWrapper.GetPeer() != nil {
w.myPeerID = w.peerWrapper.GetPeer().ID()
return nil
}
if w.don2donSharedPeer != nil {
w.myPeerID = w.don2donSharedPeer.ID()
return nil
}
return errors.New("could not get peer ID from any source")
return w.StartOnce("CapabilitiesLauncher", func() error {
if w.peerWrapper != nil && w.peerWrapper.GetPeer() != nil {
w.myPeerID = w.peerWrapper.GetPeer().ID()
return nil
}
if w.don2donSharedPeer != nil {
w.myPeerID = w.don2donSharedPeer.ID()
return nil
}
return errors.New("could not get peer ID from any source")
})
}

func (w *launcher) Close() error {
for _, s := range w.subServices {
if err := s.Close(); err != nil {
w.lggr.Errorw("failed to close a sub-service", "name", s.Name(), "error", err)
return w.StopOnce("CapabilitiesLauncher", func() error {
for _, s := range w.subServices {
if err := s.Close(); err != nil {
w.lggr.Errorw("failed to close a sub-service", "name", s.Name(), "error", err)
}
}
}
if w.peerWrapper != nil {
return w.peerWrapper.GetPeer().UpdateConnections(map[ragetypes.PeerID]p2ptypes.StreamConfig{})
}
return nil
if w.peerWrapper != nil {
return w.peerWrapper.GetPeer().UpdateConnections(map[ragetypes.PeerID]p2ptypes.StreamConfig{})
}
return nil
})
}

// LocalCapabilityManager is initialized after the Launcher is created
func (w *launcher) SetLocalCapabilityManager(lcm localcapmgr.LocalCapabilityManager) {
w.localCapMgr = lcm
}

func (w *launcher) Ready() error {
return nil
}

func (w *launcher) HealthReport() map[string]error {
return nil
return map[string]error{w.Name(): w.Healthy()}
}

func (w *launcher) Name() string {
Expand Down Expand Up @@ -299,7 +301,16 @@ func (w *launcher) donPairsToUpdate(myID ragetypes.PeerID, localRegistry *regist
return donPairs
}

func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyncer.LocalRegistry) error {
func (w *launcher) OnNewRegistry(ctx context.Context, localRegistry *registrysyncer.LocalRegistry) (err error) {
if !w.IfNotStopped(func() {
err = w.onNewRegistry(ctx, localRegistry)
}) {
return errors.New("service has been stopped")
}
return
}

func (w *launcher) onNewRegistry(ctx context.Context, localRegistry *registrysyncer.LocalRegistry) error {
w.lggr.Debug("CapabilitiesLauncher triggered...")
w.registry.SetLocalRegistry(localRegistry)

Expand Down Expand Up @@ -672,6 +683,8 @@ func (w *launcher) addToRegistryAndSetDispatcher(ctx context.Context, capability
if err != nil {
return fmt.Errorf("failed to start capability: %w", err)
}
w.muSubServices.Lock()
defer w.muSubServices.Unlock()
w.subServices = append(w.subServices, cp)
return nil
}
Expand Down Expand Up @@ -893,6 +906,8 @@ func (w *launcher) addReceiver(ctx context.Context, capability registrysyncer.Ca
return fmt.Errorf("failed to start receiver: %w", err)
}

w.muSubServices.Lock()
defer w.muSubServices.Unlock()
w.subServices = append(w.subServices, receiver)
return nil
}
Expand Down Expand Up @@ -1004,8 +1019,10 @@ func (w *launcher) startNewShim(ctx context.Context, receiver remotetypes.Receiv
_ = receiver.Close()
return fmt.Errorf("failed to register receiver for capability %s, method %s: %w", capID, method, err)
}
w.subServices = append(w.subServices, receiver)
w.lggr.Debugw("New remote shim started successfully for capability method", "id", capID, "method", method, "donID", donID)
w.muSubServices.Lock()
defer w.muSubServices.Unlock()
w.subServices = append(w.subServices, receiver)
return nil
}

Expand Down
Loading