Skip to content

Commit a921865

Browse files
committed
feat: queue pending service deletion api calls if the immediate removal fails
1 parent 1aebf73 commit a921865

2 files changed

Lines changed: 97 additions & 18 deletions

File tree

pkg/app/app.go

Lines changed: 94 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,13 @@ type App struct {
5050

5151
Device *tedge.Target
5252

53-
config Config
54-
shutdown chan struct{}
55-
updateRequests chan ActionRequest
56-
updateResults chan error
57-
wg sync.WaitGroup
53+
config Config
54+
shutdown chan struct{}
55+
updateRequests chan ActionRequest
56+
updateResults chan error
57+
wg sync.WaitGroup
58+
pendingDeletions []tedge.Target
59+
pendingDeletionsMu sync.Mutex
5860
}
5961

6062
type Config struct {
@@ -152,14 +154,15 @@ func NewApp(device tedge.Target, config Config) (*App, error) {
152154
}
153155

154156
application := &App{
155-
client: tedgeClient,
156-
ContainerClient: containerClient,
157-
Device: &device,
158-
config: config,
159-
updateRequests: make(chan ActionRequest),
160-
updateResults: make(chan error),
161-
shutdown: make(chan struct{}),
162-
wg: sync.WaitGroup{},
157+
client: tedgeClient,
158+
ContainerClient: containerClient,
159+
Device: &device,
160+
config: config,
161+
updateRequests: make(chan ActionRequest),
162+
updateResults: make(chan error),
163+
shutdown: make(chan struct{}),
164+
wg: sync.WaitGroup{},
165+
pendingDeletions: make([]tedge.Target, 0),
163166
}
164167

165168
// Start background task to process requests
@@ -244,6 +247,26 @@ func (a *App) Subscribe() error {
244247
}
245248
})
246249

250+
// Subscribe to cloud bridge health topics so we can retry any failed cloud
251+
// deletions and trigger a full resync when connectivity is restored.
252+
// Both the built-in bridge (tedge-mapper-c8y) and the mosquitto bridge
253+
// (c8y-mapper) variants are covered.
254+
for _, bridgeService := range []string{"tedge-mapper-c8y", "tedge-mapper-bridge-c8y", "mosquitto-c8y-bridge"} {
255+
bridgeTopic := tedge.GetHealthTopic(*a.Device.Service(bridgeService))
256+
slog.Info("Subscribing to bridge health topic.", "topic", bridgeTopic)
257+
a.client.Client.AddRoute(bridgeTopic, func(c mqtt.Client, m mqtt.Message) {
258+
if len(m.Payload()) == 0 {
259+
return
260+
}
261+
if isBridgeOnline(m.Payload()) {
262+
slog.Info("Cloud bridge is online, triggering service resync to process any pending cloud deletions.", "topic", m.Topic())
263+
go func() {
264+
a.updateRequests <- NewUpdateAllAction(container.FilterOptions{})
265+
}()
266+
}
267+
})
268+
}
269+
247270
return nil
248271
}
249272

@@ -323,6 +346,28 @@ func mustMarshalJSON(v any) []byte {
323346
return b
324347
}
325348

349+
// isBridgeOnline returns true when a bridge health payload indicates the bridge
350+
// is online. It handles two formats:
351+
// - The mosquitto bridge format: a plain "1" (online) or "0" (offline).
352+
// - The thin-edge built-in bridge format: JSON {"status":"up"}.
353+
func isBridgeOnline(payload []byte) bool {
354+
// Mosquitto bridge publishes "1" when connected and "0" when disconnected.
355+
p := strings.TrimSpace(string(payload))
356+
if p == "1" {
357+
return true
358+
}
359+
if p == "0" {
360+
return false
361+
}
362+
var s struct {
363+
Status string `json:"status"`
364+
}
365+
if err := json.Unmarshal(payload, &s); err != nil {
366+
return false
367+
}
368+
return s.Status == tedge.StatusUp
369+
}
370+
326371
func getEventAttributes(attr map[string]string, props ...string) []string {
327372
out := make([]string, 0)
328373
for _, prop := range props {
@@ -578,6 +623,34 @@ func (a *App) doUpdate(filterOptions container.FilterOptions) error {
578623
}
579624
}
580625

626+
// Retry any cloud deletions that previously failed due to the proxy being unavailable.
627+
if a.config.DeleteFromCloud {
628+
a.pendingDeletionsMu.Lock()
629+
pendingRetry := a.pendingDeletions
630+
a.pendingDeletions = make([]tedge.Target, 0)
631+
a.pendingDeletionsMu.Unlock()
632+
633+
if len(pendingRetry) > 0 {
634+
slog.Info("Retrying previously-failed cloud deletions.", "count", len(pendingRetry))
635+
for _, target := range pendingRetry {
636+
// Skip if the service has re-registered itself since the deletion was queued.
637+
if _, reregistered := entities[target.TopicID]; reregistered {
638+
slog.Info("Skipping pending cloud deletion: service re-registered.", "topic", target.Topic())
639+
continue
640+
}
641+
slog.Info("Retrying cloud deletion.", "topic", target.Topic())
642+
if _, err := tedgeClient.DeleteCumulocityManagedObject(target); err != nil {
643+
slog.Warn("Failed to retry cloud deletion, re-queuing.", "err", err, "topic", target.Topic())
644+
a.pendingDeletionsMu.Lock()
645+
a.pendingDeletions = append(a.pendingDeletions, target)
646+
a.pendingDeletionsMu.Unlock()
647+
} else {
648+
slog.Info("Successfully retried cloud deletion.", "topic", target.Topic())
649+
}
650+
}
651+
}
652+
}
653+
581654
// Delete removed values, via MQTT and c8y API
582655
markedForDeletion := make([]tedge.Target, 0)
583656
if removeStaleServices {
@@ -604,14 +677,17 @@ func (a *App) doUpdate(filterOptions container.FilterOptions) error {
604677
for _, target := range markedForDeletion {
605678
slog.Info("Removing service from the cloud", "topic", target.Topic())
606679

607-
// FIXME: How to handle if the device is deregistered locally, but still exists in the cloud?
608-
// Should it try to reconcile with the cloud to delete orphaned services?
609-
// Delete service directly from Cumulocity using the local Cumulocity Proxy
680+
// Delete service directly from Cumulocity using the local Cumulocity Proxy.
681+
// If the proxy is unavailable (e.g. the device is offline or the mapper is
682+
// restarting), queue the target so the deletion is retried when the bridge
683+
// comes back online.
610684
target.CloudIdentity = tedgeClient.Target.CloudIdentity
611685
if target.CloudIdentity != "" {
612-
// Delay deleting the value
613686
if _, err := tedgeClient.DeleteCumulocityManagedObject(target); err != nil {
614-
slog.Warn("Failed to delete managed object.", "err", err)
687+
slog.Warn("Failed to delete managed object, queuing for retry.", "err", err, "topic", target.Topic())
688+
a.pendingDeletionsMu.Lock()
689+
a.pendingDeletions = append(a.pendingDeletions, target)
690+
a.pendingDeletionsMu.Unlock()
615691
}
616692
}
617693
}

pkg/tedge/tedge.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,9 @@ func NewClient(parent Target, target Target, serviceName string, config *ClientC
165165
subscriptions := make(map[string]byte)
166166
subscriptions[target.RootPrefix+"/+/+/+/+"] = 1
167167
subscriptions[GetTopic(*target.Service("+"), "cmd", "health", "check")] = 1
168+
// Subscribe to service health status topics so bridge online/offline
169+
// transitions can be detected and used to retry pending cloud operations.
170+
subscriptions[target.RootPrefix+"/+/+/service/+/status/health"] = 1
168171
slog.Info("Subscribing to topics.", "topics", subscriptions)
169172
tok := c.SubscribeMultiple(subscriptions, nil)
170173
tok.Wait()

0 commit comments

Comments
 (0)