@@ -15,6 +15,7 @@ import (
1515 mqtt "github.com/eclipse/paho.mqtt.golang"
1616 "github.com/reubenmiller/go-c8y/pkg/c8y"
1717 "github.com/thin-edge/tedge-container-plugin/pkg/container"
18+ "github.com/thin-edge/tedge-container-plugin/pkg/random"
1819 "github.com/thin-edge/tedge-container-plugin/pkg/tedge"
1920)
2021
@@ -50,11 +51,13 @@ type App struct {
5051
5152 Device * tedge.Target
5253
53- config Config
54- shutdown chan struct {}
55- updateRequests chan ActionRequest
56- updateResults chan error
57- wg sync.WaitGroup
54+ config Config
55+ shutdown chan struct {}
56+ updateRequests chan ActionRequest
57+ updateResults chan error
58+ wg sync.WaitGroup
59+ pendingDeletions []tedge.Target
60+ pendingDeletionsMu sync.Mutex
5861}
5962
6063type Config struct {
@@ -72,6 +75,7 @@ type Config struct {
7275 EnableEngineEvents bool
7376 DeleteFromCloud bool
7477 DeleteOrphans bool
78+ RunOnce bool
7579
7680 HTTPHost string
7781 HTTPPort uint16
@@ -96,6 +100,11 @@ func NewApp(device tedge.Target, config Config) (*App, error) {
96100 KeyFile : config .KeyFile ,
97101 CAFile : config .CAFile ,
98102 }
103+ if config .RunOnce {
104+ // use a randomized client id in run-once mode so it doesn't affect the main
105+ // service or any other instances also running in run-once mode
106+ tedgeOpts .MQTTClientID = fmt .Sprintf ("%s-%s#%s" , config .ServiceName , random .String (8 ), serviceTarget .Topic ())
107+ }
99108 tedgeClient := tedge .NewClient (device , * serviceTarget , config .ServiceName , tedgeOpts )
100109
101110 ctx , ctxCancel := context .WithTimeout (context .TODO (), 300 * time .Second )
@@ -152,20 +161,29 @@ func NewApp(device tedge.Target, config Config) (*App, error) {
152161 }
153162
154163 application := & App {
155- client : tedgeClient ,
156- ContainerClient : containerClient ,
157- Device : & device ,
158- config : config ,
159- updateRequests : make (chan ActionRequest ),
160- updateResults : make (chan error ),
161- shutdown : make (chan struct {}),
162- wg : sync.WaitGroup {},
164+ client : tedgeClient ,
165+ ContainerClient : containerClient ,
166+ Device : & device ,
167+ config : config ,
168+ updateRequests : make (chan ActionRequest ),
169+ updateResults : make (chan error ),
170+ shutdown : make (chan struct {}),
171+ wg : sync.WaitGroup {},
172+ pendingDeletions : make ([]tedge.Target , 0 ),
163173 }
164174
165175 // Start background task to process requests
166176 application .wg .Add (1 )
167177 go application .worker ()
168178
179+ // Register MQTT route callbacks. This must be done after the struct is
180+ // constructed (so the callbacks can reference it) but is safe to call
181+ // here because AddRoute only registers in-process handlers — no broker
182+ // interaction occurs.
183+ if err := application .Subscribe (); err != nil {
184+ return nil , err
185+ }
186+
169187 return application , nil
170188}
171189
@@ -244,6 +262,26 @@ func (a *App) Subscribe() error {
244262 }
245263 })
246264
265+ // Subscribe to cloud bridge health topics so we can retry any failed cloud
266+ // deletions and trigger a full resync when connectivity is restored.
267+ // Both the built-in bridge (tedge-mapper-c8y) and the mosquitto bridge
268+ // (c8y-mapper) variants are covered.
269+ for _ , bridgeService := range []string {"tedge-mapper-c8y" , "tedge-mapper-bridge-c8y" , "mosquitto-c8y-bridge" } {
270+ bridgeTopic := tedge .GetHealthTopic (* a .Device .Service (bridgeService ))
271+ slog .Info ("Subscribing to bridge health topic." , "topic" , bridgeTopic )
272+ a .client .Client .AddRoute (bridgeTopic , func (c mqtt.Client , m mqtt.Message ) {
273+ if len (m .Payload ()) == 0 {
274+ return
275+ }
276+ if isBridgeOnline (m .Payload ()) {
277+ slog .Info ("Cloud bridge is online, triggering service resync to process any pending cloud deletions." , "topic" , m .Topic ())
278+ go func () {
279+ a .updateRequests <- NewUpdateAllAction (container.FilterOptions {})
280+ }()
281+ }
282+ })
283+ }
284+
247285 return nil
248286}
249287
@@ -323,6 +361,28 @@ func mustMarshalJSON(v any) []byte {
323361 return b
324362}
325363
364+ // isBridgeOnline returns true when a bridge health payload indicates the bridge
365+ // is online. It handles two formats:
366+ // - The mosquitto bridge format: a plain "1" (online) or "0" (offline).
367+ // - The thin-edge built-in bridge format: JSON {"status":"up"}.
368+ func isBridgeOnline (payload []byte ) bool {
369+ // Mosquitto bridge publishes "1" when connected and "0" when disconnected.
370+ p := strings .TrimSpace (string (payload ))
371+ if p == "1" {
372+ return true
373+ }
374+ if p == "0" {
375+ return false
376+ }
377+ var s struct {
378+ Status string `json:"status"`
379+ }
380+ if err := json .Unmarshal (payload , & s ); err != nil {
381+ return false
382+ }
383+ return s .Status == tedge .StatusUp
384+ }
385+
326386func getEventAttributes (attr map [string ]string , props ... string ) []string {
327387 out := make ([]string , 0 )
328388 for _ , prop := range props {
@@ -578,6 +638,34 @@ func (a *App) doUpdate(filterOptions container.FilterOptions) error {
578638 }
579639 }
580640
641+ // Retry any cloud deletions that previously failed due to the proxy being unavailable.
642+ if a .config .DeleteFromCloud {
643+ a .pendingDeletionsMu .Lock ()
644+ pendingRetry := a .pendingDeletions
645+ a .pendingDeletions = make ([]tedge.Target , 0 )
646+ a .pendingDeletionsMu .Unlock ()
647+
648+ if len (pendingRetry ) > 0 {
649+ slog .Info ("Retrying previously-failed cloud deletions." , "count" , len (pendingRetry ))
650+ for _ , target := range pendingRetry {
651+ // Skip if the service has re-registered itself since the deletion was queued.
652+ if _ , reregistered := entities [target .TopicID ]; reregistered {
653+ slog .Info ("Skipping pending cloud deletion: service re-registered." , "topic" , target .Topic ())
654+ continue
655+ }
656+ slog .Info ("Retrying cloud deletion." , "topic" , target .Topic ())
657+ if _ , err := tedgeClient .DeleteCumulocityManagedObject (target ); err != nil {
658+ slog .Warn ("Failed to retry cloud deletion, re-queuing." , "err" , err , "topic" , target .Topic ())
659+ a .pendingDeletionsMu .Lock ()
660+ a .pendingDeletions = append (a .pendingDeletions , target )
661+ a .pendingDeletionsMu .Unlock ()
662+ } else {
663+ slog .Info ("Successfully retried cloud deletion." , "topic" , target .Topic ())
664+ }
665+ }
666+ }
667+ }
668+
581669 // Delete removed values, via MQTT and c8y API
582670 markedForDeletion := make ([]tedge.Target , 0 )
583671 if removeStaleServices {
@@ -604,14 +692,17 @@ func (a *App) doUpdate(filterOptions container.FilterOptions) error {
604692 for _ , target := range markedForDeletion {
605693 slog .Info ("Removing service from the cloud" , "topic" , target .Topic ())
606694
607- // FIXME: How to handle if the device is deregistered locally, but still exists in the cloud?
608- // Should it try to reconcile with the cloud to delete orphaned services?
609- // Delete service directly from Cumulocity using the local Cumulocity Proxy
695+ // Delete service directly from Cumulocity using the local Cumulocity Proxy.
696+ // If the proxy is unavailable (e.g. the device is offline or the mapper is
697+ // restarting), queue the target so the deletion is retried when the bridge
698+ // comes back online.
610699 target .CloudIdentity = tedgeClient .Target .CloudIdentity
611700 if target .CloudIdentity != "" {
612- // Delay deleting the value
613701 if _ , err := tedgeClient .DeleteCumulocityManagedObject (target ); err != nil {
614- slog .Warn ("Failed to delete managed object." , "err" , err )
702+ slog .Warn ("Failed to delete managed object, queuing for retry." , "err" , err , "topic" , target .Topic ())
703+ a .pendingDeletionsMu .Lock ()
704+ a .pendingDeletions = append (a .pendingDeletions , target )
705+ a .pendingDeletionsMu .Unlock ()
615706 }
616707 }
617708 }
0 commit comments