From 255f6baabaaec38571a7b1ffea3e09a43e29e23f Mon Sep 17 00:00:00 2001 From: Atom Date: Thu, 2 Oct 2025 22:28:44 +0200 Subject: [PATCH] feat: add participants list and part. status --- api/status/cache.go | 93 +++++++++++++++ api/status/checker.go | 245 +++++++++++++++++++++++++++++++++++++++ api/status/evaluator.go | 248 ++++++++++++++++++++++++++++++++++++++++ api/status/models.go | 49 ++++++++ go.mod | 2 + main.go | 113 ++++++++++++++++++ 6 files changed, 750 insertions(+) create mode 100644 api/status/cache.go create mode 100644 api/status/checker.go create mode 100644 api/status/evaluator.go create mode 100644 api/status/models.go diff --git a/api/status/cache.go b/api/status/cache.go new file mode 100644 index 0000000..2389869 --- /dev/null +++ b/api/status/cache.go @@ -0,0 +1,93 @@ +package status + +import ( + "sync" + "time" +) + +// In memory cache for status responses +type statusCache struct { + data map[string]*cacheEntry + mu sync.RWMutex + ttl time.Duration + stopChan chan struct{} +} + +type cacheEntry struct { + response *ParticipantStatusResponse + expiresAt time.Time +} + +func newStatusCache(ttl time.Duration) *statusCache { + cache := &statusCache{ + data: make(map[string]*cacheEntry), + ttl: ttl, + stopChan: make(chan struct{}), + } + + go cache.cleanup() + + return cache +} + +func (c *statusCache) get(key string) *ParticipantStatusResponse { + c.mu.RLock() + defer c.mu.RUnlock() + + entry, exists := c.data[key] + if !exists || time.Now().After(entry.expiresAt) { + return nil + } + + return entry.response +} + +func (c *statusCache) set(key string, response *ParticipantStatusResponse) { + c.mu.Lock() + defer c.mu.Unlock() + + c.data[key] = &cacheEntry{ + response: response, + expiresAt: time.Now().Add(c.ttl), + } +} + +func (c *statusCache) cleanup() { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c.mu.Lock() + now := time.Now() + for key, entry := range c.data { + if now.After(entry.expiresAt) { + delete(c.data, key) + } + } + c.mu.Unlock() + case <-c.stopChan: + // Graceful shutdown requested + return + } + } +} + +// Gracefully stops the cache cleanup goroutine +func (c *statusCache) stop() { + close(c.stopChan) +} + +// Removes a participant from the cache +func (c *statusCache) invalidate(participantName string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.data, participantName) +} + +func (c *statusCache) clear() { + c.mu.Lock() + defer c.mu.Unlock() + c.data = make(map[string]*cacheEntry) +} diff --git a/api/status/checker.go b/api/status/checker.go new file mode 100644 index 0000000..e424ec1 --- /dev/null +++ b/api/status/checker.go @@ -0,0 +1,245 @@ +package status + +import ( + "context" + "fmt" + "log" + "time" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type StatusChecker struct { + kubeClient client.Client + cache *statusCache + evaluator *StatusEvaluator +} + +func NewStatusChecker(kubeClient client.Client) *StatusChecker { + return &StatusChecker{ + kubeClient: kubeClient, + cache: newStatusCache(10 * time.Second), + evaluator: NewStatusEvaluator(), + } +} + +func (sc *StatusChecker) GetParticipantStatus(ctx context.Context, participantName string) (*ParticipantStatusResponse, error) { + // If the caller hasn't set a deadline, add a default timeout in order to prevent indefinite blocking on Kubernetes API calls + if _, hasDeadline := ctx.Deadline(); !hasDeadline { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, 30*time.Second) + defer cancel() + } + + // Check cache first ( to avoid redundant Kubernetes API calls ) + if cached := sc.cache.get(participantName); cached != nil { + log.Printf("Cache hit for participant %s", participantName) + return cached, nil + } + log.Printf("Cache miss for participant %s", participantName) + + namespace := &corev1.Namespace{} + err := sc.kubeClient.Get(ctx, client.ObjectKey{Name: participantName}, namespace) + if err != nil { + if errors.IsNotFound(err) { + response := &ParticipantStatusResponse{ + ParticipantName: participantName, + Status: StatusNotFound, + LastUpdated: time.Now(), + Message: fmt.Sprintf("Namespace %s does not exist", participantName), + Components: make(map[string]ComponentStatus), + } + // Cache NOT_FOUND responses to avoid repeated K8s API calls + sc.cache.set(participantName, response) + return response, nil + } + return nil, fmt.Errorf("failed to get namespace: %w", err) + } + + // Check if namespace is being deleted + if namespace.DeletionTimestamp != nil { + response := &ParticipantStatusResponse{ + ParticipantName: participantName, + Status: StatusDeleting, + LastUpdated: time.Now(), + Message: fmt.Sprintf("Namespace %s is being deleted", participantName), + Components: make(map[string]ComponentStatus), + } + sc.cache.set(participantName, response) + return response, nil + } + + // Get component statuses + components, err := sc.getComponentStatuses(ctx, participantName) + if err != nil { + return nil, fmt.Errorf("failed to get component statuses: %w", err) + } + + overallStatus, message := sc.evaluator.DetermineOverallStatus(components) + + // Get recent events (if errors, just log a warning and continue) + events, err := sc.evaluator.GetRecentEvents(ctx, sc.kubeClient, participantName) + if err != nil { + fmt.Printf("Warning: failed to get events for namespace %s: %v\n", participantName, err) + events = []Event{} + } + + response := &ParticipantStatusResponse{ + ParticipantName: participantName, + Status: overallStatus, + LastUpdated: time.Now(), + Components: components, + Message: message, + Events: events, + } + + sc.cache.set(participantName, response) + + return response, nil +} + +func (sc *StatusChecker) getComponentStatuses(ctx context.Context, namespace string) (map[string]ComponentStatus, error) { + deploymentList := &appsv1.DeploymentList{} + err := sc.kubeClient.List(ctx, deploymentList, client.InNamespace(namespace)) + if err != nil { + return nil, fmt.Errorf("failed to list deployments: %w", err) + } + + components := make(map[string]ComponentStatus) + for _, deployment := range deploymentList.Items { + components[deployment.Name] = sc.evaluator.GetDeploymentStatus(&deployment) + } + + statefulSetList := &appsv1.StatefulSetList{} + err = sc.kubeClient.List(ctx, statefulSetList, client.InNamespace(namespace)) + if err != nil { + return nil, fmt.Errorf("failed to list statefulsets: %w", err) + } + + for _, sts := range statefulSetList.Items { + components[sts.Name] = sc.evaluator.GetStatefulSetStatus(&sts) + } + + return components, nil +} + +func (sc *StatusChecker) ListParticipants(ctx context.Context, statusFilter string, page, limit int) ([]ParticipantSummary, int, error) { + if _, hasDeadline := ctx.Deadline(); !hasDeadline { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, 30*time.Second) + defer cancel() + } + + namespaceList := &corev1.NamespaceList{} + err := sc.kubeClient.List(ctx, namespaceList) + if err != nil { + return nil, 0, fmt.Errorf("failed to list namespaces: %w", err) + } + + participants := make([]ParticipantSummary, 0) + + for _, ns := range namespaceList.Items { + // Skip system namespaces + if IsSystemNamespace(ns.Name) { + continue + } + + hasDeployments, err := sc.hasParticipantDeployments(ctx, ns.Name) + if err != nil { + fmt.Printf("Warning: failed to check deployments in namespace %s: %v\n", ns.Name, err) + continue + } + + if !hasDeployments { + continue + } + + // Get full status (will use in memory cache if available) + status, err := sc.GetParticipantStatus(ctx, ns.Name) + if err != nil { + fmt.Printf("Warning: failed to get status for namespace %s: %v\n", ns.Name, err) + continue + } + + if statusFilter != "" && string(status.Status) != statusFilter { + continue + } + + participants = append(participants, ParticipantSummary{ + ParticipantName: ns.Name, + Status: status.Status, + LastUpdated: status.LastUpdated, + }) + } + + // Apply status filter if provided + if statusFilter != "" { + filtered := make([]ParticipantSummary, 0) + for _, p := range participants { + if string(p.Status) == statusFilter { + filtered = append(filtered, p) + } + } + participants = filtered + } + + total := len(participants) + + // Apply in memory pagination + start := (page - 1) * limit + end := start + limit + + if start >= total { + return []ParticipantSummary{}, total, nil + } + + if end > total { + end = total + } + + return participants[start:end], total, nil +} + +// hasParticipantDeployments checks if a namespace has any of our participant deployments +func (sc *StatusChecker) hasParticipantDeployments(ctx context.Context, namespace string) (bool, error) { + deploymentList := &appsv1.DeploymentList{} + err := sc.kubeClient.List(ctx, deploymentList, client.InNamespace(namespace)) + if err != nil { + return false, err + } + + // Check if has at least one of our provisioner deployments + for _, deployment := range deploymentList.Items { + if isProvisionerDeployment(deployment.Name) { + return true, nil + } + } + + return false, nil +} + +// isProvisionerDeployment checks if a deployment name is one of our provisioner deployments +func isProvisionerDeployment(name string) bool { + provisionerDeployments := []string{"controlplane", "dataplane", "identityhub", "postgres"} + for _, dep := range provisionerDeployments { + if name == dep { + return true + } + } + return false +} + +func (sc *StatusChecker) InvalidateCache(participantName string) { + sc.cache.invalidate(participantName) +} + +func (sc *StatusChecker) ClearCache() { + sc.cache.clear() +} + +func (sc *StatusChecker) Close() { + sc.cache.stop() +} diff --git a/api/status/evaluator.go b/api/status/evaluator.go new file mode 100644 index 0000000..bdbce75 --- /dev/null +++ b/api/status/evaluator.go @@ -0,0 +1,248 @@ +package status + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type StatusEvaluator struct{} + +func NewStatusEvaluator() *StatusEvaluator { + return &StatusEvaluator{} +} + +// To be considered READY. These are the core components required for basic functionality. +// +// Components: +// - controlplane: EDC Control Plane +// - dataplane: EDC Data Plane +// - identityhub: Identity Hub +// - postgres: PostgreSQL +// +// This list is currently hardcoded but could be made configurable via env. or config file in future versions (if flexibility is needed). +var criticalDeployments = []string{"controlplane", "dataplane", "identityhub", "postgres"} + +func (se *StatusEvaluator) GetDeploymentStatus(deployment *appsv1.Deployment) ComponentStatus { + desired := int32(1) + if deployment.Spec.Replicas != nil { + desired = *deployment.Spec.Replicas + } + + current := deployment.Status.Replicas + ready := deployment.Status.ReadyReplicas + + status := "Unknown" + isReady := false + message := "" + + if ready == desired && desired > 0 { + status = "Running" + isReady = true + } else if current == 0 { + status = "Pending" + message = "No pods are running" + } else if ready < desired { + status = "Starting" + message = fmt.Sprintf("%d of %d replicas ready", ready, desired) + } else if deployment.Status.UnavailableReplicas > 0 { + status = "Degraded" + message = fmt.Sprintf("%d replicas unavailable", deployment.Status.UnavailableReplicas) + } + + return ComponentStatus{ + Status: status, + Ready: isReady, + Replicas: ReplicaStatus{ + Desired: desired, + Current: current, + Ready: ready, + }, + Message: message, + } +} + +func (se *StatusEvaluator) GetStatefulSetStatus(sts *appsv1.StatefulSet) ComponentStatus { + desired := int32(1) + if sts.Spec.Replicas != nil { + desired = *sts.Spec.Replicas + } + + current := sts.Status.Replicas + ready := sts.Status.ReadyReplicas + + status := "Unknown" + isReady := false + message := "" + + if ready == desired && desired > 0 { + status = "Running" + isReady = true + } else if current == 0 { + status = "Pending" + message = "No pods are running" + } else if ready < desired { + status = "Starting" + message = fmt.Sprintf("%d of %d replicas ready", ready, desired) + } + + return ComponentStatus{ + Status: status, + Ready: isReady, + Replicas: ReplicaStatus{ + Desired: desired, + Current: current, + Ready: ready, + }, + Message: message, + } +} + +func (se *StatusEvaluator) DetermineOverallStatus(components map[string]ComponentStatus) (ProvisioningStatus, string) { + if len(components) == 0 { + return StatusProvisioning, "No components found, provisioning may be in progress" + } + + allCriticalReady := true + anyNonCriticalNotReady := false + criticalNotReadyCount := 0 + messages := []string{} + + for _, deploymentName := range criticalDeployments { + component, exists := components[deploymentName] + if !exists { + allCriticalReady = false + criticalNotReadyCount++ + messages = append(messages, fmt.Sprintf("Critical component %s not found", deploymentName)) + continue + } + + if !component.Ready { + allCriticalReady = false + criticalNotReadyCount++ + if component.Message != "" { + messages = append(messages, fmt.Sprintf("%s: %s", deploymentName, component.Message)) + } + } + } + + // Check non-critical components + for name, component := range components { + isCritical := false + for _, critical := range criticalDeployments { + if name == critical { + isCritical = true + break + } + } + if !isCritical && !component.Ready { + anyNonCriticalNotReady = true + } + } + + if allCriticalReady && !anyNonCriticalNotReady { + return StatusReady, "All components are running and ready" + } else if allCriticalReady && anyNonCriticalNotReady { + return StatusDegraded, "All critical components ready, but some non-critical components are not ready" + } else if criticalNotReadyCount == len(criticalDeployments) { + // All critical components missing/not ready - likely still provisioning + return StatusProvisioning, "Critical components are not yet ready" + } else { + // Some critical components ready, some not - degraded state + msg := fmt.Sprintf("%d of %d critical components not ready", criticalNotReadyCount, len(criticalDeployments)) + if len(messages) > 0 { + msg = msg + ": " + messages[0] // Include first issue + } + return StatusDegraded, msg + } +} + +func (se *StatusEvaluator) GetRecentEvents(ctx context.Context, kubeClient client.Client, namespace string) ([]Event, error) { + eventList := &corev1.EventList{} + err := kubeClient.List(ctx, eventList, client.InNamespace(namespace)) + if err != nil { + return nil, fmt.Errorf("failed to list events: %w", err) + } + + // Filter events from last 30 minutes + events := make([]Event, 0) + cutoff := time.Now().Add(-30 * time.Minute) + + for _, event := range eventList.Items { + if event.LastTimestamp.Time.After(cutoff) { + events = append(events, Event{ + Timestamp: event.LastTimestamp.Time, + Type: event.Type, + Message: event.Message, + }) + } + } + + // Sort by timestamp (most recent first) + sort.Slice(events, func(i, j int) bool { + return events[i].Timestamp.After(events[j].Timestamp) + }) + + if len(events) > 10 { + events = events[:10] + } + + return events, nil +} + +func IsSystemNamespace(name string) bool { + systemNamespaces := []string{ + "kube-system", + "kube-public", + "kube-node-lease", + "default", + } + + for _, sysNs := range systemNamespaces { + if name == sysNs { + return true + } + } + + return false +} + +// checks if an error is due to Kubernetes API being unavailable. +func IsKubernetesUnavailableError(err error) bool { + if err == nil { + return false + } + + errMsg := err.Error() + + // Common Kubernetes connectivity error patterns + unavailablePatterns := []string{ + "connection refused", + "connection reset", + "no such host", + "timeout", + "timed out", + "unable to connect", + "dial tcp", + "i/o timeout", + "context deadline exceeded", + "server is currently unable", + "TLS handshake", + "network is unreachable", + "EOF", + } + + for _, pattern := range unavailablePatterns { + if strings.Contains(errMsg, pattern) { + return true + } + } + + return false +} diff --git a/api/status/models.go b/api/status/models.go new file mode 100644 index 0000000..a51dca1 --- /dev/null +++ b/api/status/models.go @@ -0,0 +1,49 @@ +package status + +import "time" + +type ProvisioningStatus string + +const ( + StatusProvisioning ProvisioningStatus = "PROVISIONING" + StatusReady ProvisioningStatus = "READY" + StatusDegraded ProvisioningStatus = "DEGRADED" + StatusFailed ProvisioningStatus = "FAILED" + StatusDeleting ProvisioningStatus = "DELETING" + StatusNotFound ProvisioningStatus = "NOT_FOUND" +) + +type ComponentStatus struct { + Status string `json:"status"` + Ready bool `json:"ready"` + Replicas ReplicaStatus `json:"replicas"` + Message string `json:"message,omitempty"` +} + +type ReplicaStatus struct { + Desired int32 `json:"desired"` + Current int32 `json:"current"` + Ready int32 `json:"ready"` +} + +type Event struct { + Timestamp time.Time `json:"timestamp"` + Type string `json:"type"` + Message string `json:"message"` +} + +// Full response for GET /resources/{name}/status +type ParticipantStatusResponse struct { + ParticipantName string `json:"participantName"` + Status ProvisioningStatus `json:"status"` + LastUpdated time.Time `json:"lastUpdated"` + Components map[string]ComponentStatus `json:"components"` + Message string `json:"message"` + Events []Event `json:"events,omitempty"` +} + +type ParticipantSummary struct { + ParticipantName string `json:"participantName"` + Status ProvisioningStatus `json:"status"` + LastUpdated time.Time `json:"lastUpdated"` +} diff --git a/go.mod b/go.mod index 75a791f..f841512 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect @@ -50,6 +51,7 @@ require ( golang.org/x/text v0.23.0 // indirect golang.org/x/time v0.9.0 // indirect google.golang.org/protobuf v1.36.5 // indirect + gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/klog/v2 v2.130.1 // indirect diff --git a/main.go b/main.go index e62378b..5e6d995 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "aruba-provisioner/api" + "aruba-provisioner/api/status" "context" "encoding/base64" "errors" @@ -11,6 +12,7 @@ import ( "log" "net/http" "os" + "strconv" "strings" "time" @@ -85,9 +87,112 @@ func main() { log.Fatalf("create client: %v", err) } + statusChecker := status.NewStatusChecker(kubeClient) + app := fiber.New() { group := app.Group("/api/v1/resources") + + group.Get("/", func(c *fiber.Ctx) error { + statusFilter := c.Query("status") + + pageStr := c.Query("page", "1") + page, err := strconv.Atoi(pageStr) + if err != nil || page < 1 { + return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{ + "error": "Invalid page parameter", + "details": fmt.Sprintf("page must be a positive integer, got: %s", pageStr), + }) + } + + limitStr := c.Query("limit", "10") + limit, err := strconv.Atoi(limitStr) + if err != nil || limit < 1 || limit > 100 { + return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{ + "error": "Invalid limit parameter", + "details": fmt.Sprintf("limit must be an integer between 1 and 100, got: %s", limitStr), + }) + } + + // Validate status filter if provided + if statusFilter != "" { + validStatuses := []string{"PROVISIONING", "READY", "DEGRADED", "FAILED", "DELETING", "NOT_FOUND"} + isValid := false + for _, status := range validStatuses { + if statusFilter == status { + isValid = true + break + } + } + if !isValid { + return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{ + "error": "Invalid status filter", + "details": fmt.Sprintf("status must be one of: %s, got: %s", strings.Join(validStatuses, ", "), statusFilter), + }) + } + } + + participants, total, err := statusChecker.ListParticipants(ctx, statusFilter, page, limit) + if err != nil { + // Check if error is due to Kubernetes API being unavailable + if status.IsKubernetesUnavailableError(err) { + c.Set("Retry-After", "30") + return c.Status(fiber.StatusServiceUnavailable).JSON(fiber.Map{ + "error": "Service temporarily unavailable", + "details": "Unable to connect to Kubernetes API. Please try again later.", + }) + } + // Other internal errors + return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{ + "error": "Failed to list participants", + "details": err.Error(), + }) + } + + // Set pagination headers + c.Set("X-Total", strconv.Itoa(total)) + c.Set("X-Page", strconv.Itoa(page)) + c.Set("X-Limit", strconv.Itoa(limit)) + + return c.JSON(participants) + }) + + group.Get("/:participantName/status", func(c *fiber.Ctx) error { + participantName := c.Params("participantName") + if participantName == "" { + return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{ + "error": "participantName is required", + }) + } + + response, err := statusChecker.GetParticipantStatus(ctx, participantName) + if err != nil { + // Check if error is due to Kubernetes API being unavailable + if status.IsKubernetesUnavailableError(err) { + c.Set("Retry-After", "30") + return c.Status(fiber.StatusServiceUnavailable).JSON(fiber.Map{ + "error": "Service temporarily unavailable", + "details": "Unable to connect to Kubernetes API. Please try again later.", + }) + } + // Other internal errors + return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{ + "error": "Failed to get participant status", + "details": err.Error(), + }) + } + + // 404 if participant not found + if response.Status == status.StatusNotFound { + return c.Status(fiber.StatusNotFound).JSON(fiber.Map{ + "message": response.Message, + "status": response.Status, + }) + } + + return c.JSON(response) + }) + group.Post("/", func(c *fiber.Ctx) error { definition := ParticipantDefinition{ KubernetesIngressHost: "localhost", @@ -125,9 +230,14 @@ func main() { participantDeploymentNames, func() { onDeploymentReady(definition) + // Invalidate cache when deployment is ready + statusChecker.InvalidateCache(definition.ParticipantName) }, ) + // Invalidate cache after creating resources + statusChecker.InvalidateCache(definition.ParticipantName) + return c.JSON(mergedResources) }) @@ -154,6 +264,9 @@ func main() { mergedResources[k] = v } + // Invalidate cache after deleting resources + statusChecker.InvalidateCache(request.ParticipantName) + return c.JSON(mergedResources) }) }