Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cli/run/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func NewRunCommand(cliContext cli.Cli) *cobra.Command {
application, err := app.NewApp(device, app.Config{
ContainerHost: cliContext.GetContainerHost(),
ServiceName: cliContext.GetServiceName(),
RunOnce: command.RunOnce,
EnableMetrics: cliContext.MetricsEnabled(),
DeleteFromCloud: cliContext.DeleteFromCloud(),
DeleteOrphans: cliContext.DeleteOrphans(),
Expand Down
127 changes: 109 additions & 18 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/reubenmiller/go-c8y/pkg/c8y"
"github.com/thin-edge/tedge-container-plugin/pkg/container"
"github.com/thin-edge/tedge-container-plugin/pkg/random"
"github.com/thin-edge/tedge-container-plugin/pkg/tedge"
)

Expand Down Expand Up @@ -50,11 +51,13 @@ type App struct {

Device *tedge.Target

config Config
shutdown chan struct{}
updateRequests chan ActionRequest
updateResults chan error
wg sync.WaitGroup
config Config
shutdown chan struct{}
updateRequests chan ActionRequest
updateResults chan error
wg sync.WaitGroup
pendingDeletions []tedge.Target
pendingDeletionsMu sync.Mutex
}

type Config struct {
Expand All @@ -72,6 +75,7 @@ type Config struct {
EnableEngineEvents bool
DeleteFromCloud bool
DeleteOrphans bool
RunOnce bool

HTTPHost string
HTTPPort uint16
Expand All @@ -96,6 +100,11 @@ func NewApp(device tedge.Target, config Config) (*App, error) {
KeyFile: config.KeyFile,
CAFile: config.CAFile,
}
if config.RunOnce {
// use a randomized client id in run-once mode so it doesn't affect the main
// service or any other instances also running in run-once mode
tedgeOpts.MQTTClientID = fmt.Sprintf("%s-%s#%s", config.ServiceName, random.String(8), serviceTarget.Topic())
}
tedgeClient := tedge.NewClient(device, *serviceTarget, config.ServiceName, tedgeOpts)

ctx, ctxCancel := context.WithTimeout(context.TODO(), 300*time.Second)
Expand Down Expand Up @@ -152,20 +161,29 @@ func NewApp(device tedge.Target, config Config) (*App, error) {
}

application := &App{
client: tedgeClient,
ContainerClient: containerClient,
Device: &device,
config: config,
updateRequests: make(chan ActionRequest),
updateResults: make(chan error),
shutdown: make(chan struct{}),
wg: sync.WaitGroup{},
client: tedgeClient,
ContainerClient: containerClient,
Device: &device,
config: config,
updateRequests: make(chan ActionRequest),
updateResults: make(chan error),
shutdown: make(chan struct{}),
wg: sync.WaitGroup{},
pendingDeletions: make([]tedge.Target, 0),
}

// Start background task to process requests
application.wg.Add(1)
go application.worker()

// Register MQTT route callbacks. This must be done after the struct is
// constructed (so the callbacks can reference it) but is safe to call
// here because AddRoute only registers in-process handlers — no broker
// interaction occurs.
if err := application.Subscribe(); err != nil {
return nil, err
}

return application, nil
}

Expand Down Expand Up @@ -244,6 +262,26 @@ func (a *App) Subscribe() error {
}
})

// Subscribe to cloud bridge health topics so we can retry any failed cloud
// deletions and trigger a full resync when connectivity is restored.
// Both the built-in bridge (tedge-mapper-c8y) and the mosquitto bridge
// (c8y-mapper) variants are covered.
for _, bridgeService := range []string{"tedge-mapper-c8y", "tedge-mapper-bridge-c8y", "mosquitto-c8y-bridge"} {
bridgeTopic := tedge.GetHealthTopic(*a.Device.Service(bridgeService))
slog.Info("Subscribing to bridge health topic.", "topic", bridgeTopic)
a.client.Client.AddRoute(bridgeTopic, func(c mqtt.Client, m mqtt.Message) {
if len(m.Payload()) == 0 {
return
}
if isBridgeOnline(m.Payload()) {
slog.Info("Cloud bridge is online, triggering service resync to process any pending cloud deletions.", "topic", m.Topic())
go func() {
a.updateRequests <- NewUpdateAllAction(container.FilterOptions{})
}()
}
})
}

return nil
}

Expand Down Expand Up @@ -323,6 +361,28 @@ func mustMarshalJSON(v any) []byte {
return b
}

// isBridgeOnline returns true when a bridge health payload indicates the bridge
// is online. It handles two formats:
// - The mosquitto bridge format: a plain "1" (online) or "0" (offline).
// - The thin-edge built-in bridge format: JSON {"status":"up"}.
func isBridgeOnline(payload []byte) bool {
// Mosquitto bridge publishes "1" when connected and "0" when disconnected.
p := strings.TrimSpace(string(payload))
if p == "1" {
return true
}
if p == "0" {
return false
}
var s struct {
Status string `json:"status"`
}
if err := json.Unmarshal(payload, &s); err != nil {
return false
}
return s.Status == tedge.StatusUp
}

func getEventAttributes(attr map[string]string, props ...string) []string {
out := make([]string, 0)
for _, prop := range props {
Expand Down Expand Up @@ -578,6 +638,34 @@ func (a *App) doUpdate(filterOptions container.FilterOptions) error {
}
}

// Retry any cloud deletions that previously failed due to the proxy being unavailable.
if a.config.DeleteFromCloud {
a.pendingDeletionsMu.Lock()
pendingRetry := a.pendingDeletions
a.pendingDeletions = make([]tedge.Target, 0)
a.pendingDeletionsMu.Unlock()

if len(pendingRetry) > 0 {
slog.Info("Retrying previously-failed cloud deletions.", "count", len(pendingRetry))
for _, target := range pendingRetry {
// Skip if the service has re-registered itself since the deletion was queued.
if _, reregistered := entities[target.TopicID]; reregistered {
slog.Info("Skipping pending cloud deletion: service re-registered.", "topic", target.Topic())
continue
}
slog.Info("Retrying cloud deletion.", "topic", target.Topic())
if _, err := tedgeClient.DeleteCumulocityManagedObject(target); err != nil {
slog.Warn("Failed to retry cloud deletion, re-queuing.", "err", err, "topic", target.Topic())
a.pendingDeletionsMu.Lock()
a.pendingDeletions = append(a.pendingDeletions, target)
a.pendingDeletionsMu.Unlock()
} else {
slog.Info("Successfully retried cloud deletion.", "topic", target.Topic())
}
}
}
}

// Delete removed values, via MQTT and c8y API
markedForDeletion := make([]tedge.Target, 0)
if removeStaleServices {
Expand All @@ -604,14 +692,17 @@ func (a *App) doUpdate(filterOptions container.FilterOptions) error {
for _, target := range markedForDeletion {
slog.Info("Removing service from the cloud", "topic", target.Topic())

// FIXME: How to handle if the device is deregistered locally, but still exists in the cloud?
// Should it try to reconcile with the cloud to delete orphaned services?
// Delete service directly from Cumulocity using the local Cumulocity Proxy
// Delete service directly from Cumulocity using the local Cumulocity Proxy.
// If the proxy is unavailable (e.g. the device is offline or the mapper is
// restarting), queue the target so the deletion is retried when the bridge
// comes back online.
target.CloudIdentity = tedgeClient.Target.CloudIdentity
if target.CloudIdentity != "" {
// Delay deleting the value
if _, err := tedgeClient.DeleteCumulocityManagedObject(target); err != nil {
slog.Warn("Failed to delete managed object.", "err", err)
slog.Warn("Failed to delete managed object, queuing for retry.", "err", err, "topic", target.Topic())
a.pendingDeletionsMu.Lock()
a.pendingDeletions = append(a.pendingDeletions, target)
a.pendingDeletionsMu.Unlock()
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/tedge/tedge.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type ClientConfig struct {

MqttHost string
MqttPort uint16
MQTTClientID string

CertFile string
KeyFile string
Expand Down Expand Up @@ -144,8 +145,11 @@ func NewClient(parent Target, target Target, serviceName string, config *ClientC
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", config.MqttHost, config.MqttPort))
}

opts.SetClientID(serviceName)
opts.SetClientID(fmt.Sprintf("%s#%s", serviceName, target.Topic()))
clientID := fmt.Sprintf("%s#%s", serviceName, target.Topic())
if config.MQTTClientID != "" {
clientID = config.MQTTClientID
}
opts.SetClientID(clientID)
opts.SetCleanSession(true)
// opts.SetOrderMatters(true)
opts.SetWill(GetHealthTopic(target), PayloadHealthStatusDown(), 1, true)
Expand All @@ -165,6 +169,9 @@ func NewClient(parent Target, target Target, serviceName string, config *ClientC
subscriptions := make(map[string]byte)
subscriptions[target.RootPrefix+"/+/+/+/+"] = 1
subscriptions[GetTopic(*target.Service("+"), "cmd", "health", "check")] = 1
// Subscribe to service health status topics so bridge online/offline
// transitions can be detected and used to retry pending cloud operations.
subscriptions[target.RootPrefix+"/+/+/service/+/status/health"] = 1
slog.Info("Subscribing to topics.", "topics", subscriptions)
tok := c.SubscribeMultiple(subscriptions, nil)
tok.Wait()
Expand Down
34 changes: 34 additions & 0 deletions tests/operations.robot
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,40 @@ Remove Orphaned Cloud Services
Start Service tedge-container-plugin
Cumulocity.Should Have Services name=manualapp4 min_count=0 max_count=0 timeout=10

Remove Orphaned Cloud Services eventually if Cumulocity Proxy is Unavailable at deletion time
[Documentation] Some instances the Cumulocity local proxy will not be available
... so the syncing of the services in the cloud should be able to handle
... a delayed sync when the Cumulocity local proxy is unavailable, or the device
... the device went offline at the time of installation or removal of a container
... or container-group.
... See https://github.com/thin-edge/tedge-container-plugin/issues/181

# create a local container manually
${operation}= Cumulocity.Execute Shell Command sudo tedge-container engine docker run -d --name manualapp4 busybox sh -c 'exec sleep infinity'
Operation Should Be SUCCESSFUL ${operation} timeout=60
Cumulocity.Should Have Services name=manualapp4 service_type=container status=up

# install a container-group
Install container-group application app6 1.0.0 app5 ${CURDIR}/data/apps/app5.tar.gz
Device Should Have Installed Software {"name": "app6", "version": "1.0.0", "softwareType": "container-group"}
Cumulocity.Should Have Services name=app6@httpd service_type=container-group status=up

Stop Service tedge-mapper-c8y

# Remove the container (manually)
DeviceLibrary.Execute Command cmd=sudo tedge-container engine docker rm manualapp4 --force

# Remove the container-group (manually as the mapper is down)
DeviceLibrary.Execute Command cmd=sudo /etc/tedge/sm-plugins/container-group remove app6 --module-version 1.0.0

# Start the service, and check that the service has been removed (without the explicit service type defined)
Sleep 15s
Start Service tedge-mapper-c8y

# Services should be eventually deleted
Cumulocity.Should Have Services name=manualapp4 min_count=0 max_count=0 timeout=10
Cumulocity.Should Have Services name=app6@httpd min_count=0 max_count=0 timeout=10

Install container group that uses host volume mount
[Setup] Start Service tedge-container-plugin
# Install container-group
Expand Down
Loading