Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions internal/locator/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package locator

import (
"errors"
"fmt"
)

// ErrorType categorizes locator errors for intelligent error handling
type ErrorType int

const (
ErrorTypeUnknown ErrorType = iota

// Transient errors - retry with backoff
ErrorTypeNetworkTransient // Network timeout, connection reset
ErrorTypeAPITransient // API timeout, server error (5xx)

// Permanent errors - fail fast or give up after few retries
ErrorTypeResourceNotFound // Pod, Service, Deployment doesn't exist
ErrorTypePodNotRunning // Pod exists but not in Running state
ErrorTypePodFailed // Pod in Failed state
ErrorTypeConfigInvalid // Invalid configuration (port, selector, etc)
ErrorTypePermissionDenied // No permission to access resource
ErrorTypeNoPodAvailable // No running pods available for resource (might retry longer)
)

// LocateError wraps location errors with type information for intelligent retry handling
type LocateError struct {
Type ErrorType
Message string
Err error
}

// Error implements the error interface
func (e *LocateError) Error() string {
if e.Err != nil {
return fmt.Sprintf("%s: %v", e.Message, e.Err)
}
return e.Message
}

// Unwrap implements error unwrapping for error chains
func (e *LocateError) Unwrap() error {
return e.Err
}

// IsLocateError checks if an error is a LocateError
func IsLocateError(err error) bool {
var locErr *LocateError
return errors.As(err, &locErr)
}

// GetErrorType extracts the error type from a LocateError
func GetErrorType(err error) ErrorType {
var locErr *LocateError
if errors.As(err, &locErr) {
return locErr.Type
}
return ErrorTypeUnknown
}

// NewResourceNotFoundError creates an error for missing resources
func NewResourceNotFoundError(resourceType, name string, err error) error {
return &LocateError{
Type: ErrorTypeResourceNotFound,
Message: fmt.Sprintf("%s %s not found", resourceType, name),
Err: err,
}
}

// NewPodNotRunningError creates an error for non-running pods
func NewPodNotRunningError(podName, phase string, err error) error {
return &LocateError{
Type: ErrorTypePodNotRunning,
Message: fmt.Sprintf("pod %s is not running (phase: %s)", podName, phase),
Err: err,
}
}

// NewPodFailedError creates an error for failed pods
func NewPodFailedError(podName string, err error) error {
return &LocateError{
Type: ErrorTypePodFailed,
Message: fmt.Sprintf("pod %s is in failed state", podName),
Err: err,
}
}

// NewConfigInvalidError creates an error for invalid configuration
func NewConfigInvalidError(msg string, err error) error {
return &LocateError{
Type: ErrorTypeConfigInvalid,
Message: msg,
Err: err,
}
}

// NewPermissionDeniedError creates an error for permission issues
func NewPermissionDeniedError(operation, resource string, err error) error {
return &LocateError{
Type: ErrorTypePermissionDenied,
Message: fmt.Sprintf("permission denied: cannot %s %s", operation, resource),
Err: err,
}
}

// NewAPITransientError creates an error for transient API issues
func NewAPITransientError(msg string, err error) error {
return &LocateError{
Type: ErrorTypeAPITransient,
Message: msg,
Err: err,
}
}

// NewNetworkTransientError creates an error for transient network issues
func NewNetworkTransientError(msg string, err error) error {
return &LocateError{
Type: ErrorTypeNetworkTransient,
Message: msg,
Err: err,
}
}
14 changes: 10 additions & 4 deletions internal/locator/locator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestPodLocatorNotFound(t *testing.T) {
_, _, err = locator.Locate(context.Background())

assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to get pod")
assert.Contains(t, err.Error(), "not found")
assert.Contains(t, err.Error(), "nonexistent")
}

Expand Down Expand Up @@ -86,8 +86,14 @@ func TestPodLocatorNotRunning(t *testing.T) {
_, _, err = locator.Locate(context.Background())

assert.Error(t, err)
assert.Contains(t, err.Error(), "not running")
assert.Contains(t, err.Error(), string(tc.phase))
// For failed pods, we have a specific error; for others, we say "not running"
errMsg := err.Error()
if tc.phase == corev1.PodFailed {
assert.Contains(t, errMsg, "failed state")
} else {
assert.Contains(t, errMsg, "not running")
assert.Contains(t, errMsg, string(tc.phase))
}
})
}
}
Expand Down Expand Up @@ -143,7 +149,7 @@ func TestServiceLocatorNotFound(t *testing.T) {
_, _, err = locator.Locate(context.Background())

assert.Error(t, err)
assert.Contains(t, err.Error(), "failed to get service")
assert.Contains(t, err.Error(), "not found")
}

// TestServiceLocatorNoRunningPods tests error when service has no running pods
Expand Down
21 changes: 19 additions & 2 deletions internal/locator/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
Expand All @@ -31,11 +32,27 @@ func NewPodLocator(podName string, namespace string, ports []string, client kube
func (l *PodLocator) Locate(ctx context.Context) (string, []string, error) {
pod, err := l.client.CoreV1().Pods(l.namespace).Get(ctx, l.podName, metav1.GetOptions{})
if err != nil {
return "", []string{}, fmt.Errorf("failed to get pod %s: %w", l.podName, err)
// Classify API errors
if apierrors.IsNotFound(err) {
return "", []string{}, NewResourceNotFoundError("pod", l.podName, err)
}
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) {
return "", []string{}, NewAPITransientError(fmt.Sprintf("API timeout getting pod %s", l.podName), err)
}
if apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err) {
return "", []string{}, NewPermissionDeniedError("get", fmt.Sprintf("pod %s", l.podName), err)
}
// Other API errors (network issues, etc.)
return "", []string{}, NewAPITransientError(fmt.Sprintf("failed to get pod %s", l.podName), err)
}

// Check pod status
if pod.Status.Phase == corev1.PodFailed {
return "", []string{}, NewPodFailedError(l.podName, nil)
}

if pod.Status.Phase != corev1.PodRunning {
return "", []string{}, fmt.Errorf("pod %s is not running (phase: %s)", l.podName, pod.Status.Phase)
return "", []string{}, NewPodNotRunningError(l.podName, string(pod.Status.Phase), nil)
}

return l.podName, l.ports, nil
Expand Down
48 changes: 39 additions & 9 deletions internal/locator/selector_based_locator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -44,7 +45,14 @@ func (l *SelectorBasedLocator) Locate(ctx context.Context) (string, []string, er
LabelSelector: labelSelector.String(),
})
if err != nil {
return "", []string{}, fmt.Errorf("failed to list pods for %s %s: %w", l.resourceType, l.resourceName, err)
// Classify API errors
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) {
return "", []string{}, NewAPITransientError(fmt.Sprintf("API timeout listing pods for %s %s", l.resourceType, l.resourceName), err)
}
if apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err) {
return "", []string{}, NewPermissionDeniedError("list", fmt.Sprintf("pods for %s %s", l.resourceType, l.resourceName), err)
}
return "", []string{}, NewAPITransientError(fmt.Sprintf("failed to list pods for %s %s", l.resourceType, l.resourceName), err)
}

// Find the first running pod
Expand All @@ -54,7 +62,11 @@ func (l *SelectorBasedLocator) Locate(ctx context.Context) (string, []string, er
}
}

return "", []string{}, fmt.Errorf("no running pod found for %s %s", l.resourceType, l.resourceName)
return "", []string{}, &LocateError{
Type: ErrorTypeNoPodAvailable,
Message: fmt.Sprintf("no running pod found for %s %s", l.resourceType, l.resourceName),
Err: nil,
}
}

// getSelector retrieves the label selector for the resource based on its type.
Expand All @@ -67,19 +79,25 @@ func (l *SelectorBasedLocator) getSelector(ctx context.Context) (labels.Selector
case "daemonset", "ds":
return l.getDaemonSetSelector(ctx)
default:
return nil, fmt.Errorf("unsupported resource type: %s", l.resourceType)
return nil, NewConfigInvalidError(fmt.Sprintf("unsupported resource type: %s", l.resourceType), nil)
}
}

// getDeploymentSelector retrieves the selector from a Deployment.
func (l *SelectorBasedLocator) getDeploymentSelector(ctx context.Context) (labels.Selector, error) {
deployment, err := l.client.AppsV1().Deployments(l.namespace).Get(ctx, l.resourceName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get deployment %s: %w", l.resourceName, err)
if apierrors.IsNotFound(err) {
return nil, NewResourceNotFoundError("deployment", l.resourceName, err)
}
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) {
return nil, NewAPITransientError(fmt.Sprintf("API timeout getting deployment %s", l.resourceName), err)
}
return nil, NewAPITransientError(fmt.Sprintf("failed to get deployment %s", l.resourceName), err)
}

if deployment.Spec.Selector == nil {
return nil, fmt.Errorf("deployment %s has no selector", l.resourceName)
return nil, NewConfigInvalidError(fmt.Sprintf("deployment %s has no selector", l.resourceName), nil)
}

return labels.Set(deployment.Spec.Selector.MatchLabels).AsSelector(), nil
Expand All @@ -89,11 +107,17 @@ func (l *SelectorBasedLocator) getDeploymentSelector(ctx context.Context) (label
func (l *SelectorBasedLocator) getStatefulSetSelector(ctx context.Context) (labels.Selector, error) {
statefulSet, err := l.client.AppsV1().StatefulSets(l.namespace).Get(ctx, l.resourceName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get statefulset %s: %w", l.resourceName, err)
if apierrors.IsNotFound(err) {
return nil, NewResourceNotFoundError("statefulset", l.resourceName, err)
}
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) {
return nil, NewAPITransientError(fmt.Sprintf("API timeout getting statefulset %s", l.resourceName), err)
}
return nil, NewAPITransientError(fmt.Sprintf("failed to get statefulset %s", l.resourceName), err)
}

if statefulSet.Spec.Selector == nil {
return nil, fmt.Errorf("statefulset %s has no selector", l.resourceName)
return nil, NewConfigInvalidError(fmt.Sprintf("statefulset %s has no selector", l.resourceName), nil)
}

return labels.Set(statefulSet.Spec.Selector.MatchLabels).AsSelector(), nil
Expand All @@ -103,11 +127,17 @@ func (l *SelectorBasedLocator) getStatefulSetSelector(ctx context.Context) (labe
func (l *SelectorBasedLocator) getDaemonSetSelector(ctx context.Context) (labels.Selector, error) {
daemonSet, err := l.client.AppsV1().DaemonSets(l.namespace).Get(ctx, l.resourceName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get daemonset %s: %w", l.resourceName, err)
if apierrors.IsNotFound(err) {
return nil, NewResourceNotFoundError("daemonset", l.resourceName, err)
}
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) {
return nil, NewAPITransientError(fmt.Sprintf("API timeout getting daemonset %s", l.resourceName), err)
}
return nil, NewAPITransientError(fmt.Sprintf("failed to get daemonset %s", l.resourceName), err)
}

if daemonSet.Spec.Selector == nil {
return nil, fmt.Errorf("daemonset %s has no selector", l.resourceName)
return nil, NewConfigInvalidError(fmt.Sprintf("daemonset %s has no selector", l.resourceName), nil)
}

return labels.Set(daemonSet.Spec.Selector.MatchLabels).AsSelector(), nil
Expand Down
37 changes: 30 additions & 7 deletions internal/locator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -36,7 +37,17 @@ func NewServiceLocator(svcName string, namespace string, ports []string, client
func (l *ServiceLocator) Locate(ctx context.Context) (string, []string, error) {
svc, err := l.client.CoreV1().Services(l.namespace).Get(ctx, l.svcName, metav1.GetOptions{})
if err != nil {
return "", []string{}, fmt.Errorf("failed to get service %s: %w", l.svcName, err)
// Classify API errors
if apierrors.IsNotFound(err) {
return "", []string{}, NewResourceNotFoundError("service", l.svcName, err)
}
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) {
return "", []string{}, NewAPITransientError(fmt.Sprintf("API timeout getting service %s", l.svcName), err)
}
if apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err) {
return "", []string{}, NewPermissionDeniedError("get", fmt.Sprintf("service %s", l.svcName), err)
}
return "", []string{}, NewAPITransientError(fmt.Sprintf("failed to get service %s", l.svcName), err)
}

labelSelector := labels.Set(svc.Spec.Selector).AsSelector()
Expand All @@ -45,7 +56,14 @@ func (l *ServiceLocator) Locate(ctx context.Context) (string, []string, error) {
LabelSelector: labelSelector.String(),
})
if err != nil {
return "", []string{}, fmt.Errorf("failed to list pods for service %s: %w", l.svcName, err)
// Classify API errors
if apierrors.IsTimeout(err) || apierrors.IsServerTimeout(err) {
return "", []string{}, NewAPITransientError(fmt.Sprintf("API timeout listing pods for service %s", l.svcName), err)
}
if apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err) {
return "", []string{}, NewPermissionDeniedError("list", fmt.Sprintf("pods for service %s", l.svcName), err)
}
return "", []string{}, NewAPITransientError(fmt.Sprintf("failed to list pods for service %s", l.svcName), err)
}

for _, p := range pods.Items {
Expand All @@ -59,7 +77,12 @@ func (l *ServiceLocator) Locate(ctx context.Context) (string, []string, error) {
}
}

return "", []string{}, fmt.Errorf("no running pod found for service %s", l.svcName)
// No running pods found for service
return "", []string{}, &LocateError{
Type: ErrorTypeNoPodAvailable,
Message: fmt.Sprintf("no running pod found for service %s", l.svcName),
Err: nil,
}
}

// mapPorts translates service ports to pod container ports.
Expand All @@ -72,22 +95,22 @@ func (l *ServiceLocator) mapPorts(svc *corev1.Service, pod *corev1.Pod) ([]strin

srcPort, err := strconv.Atoi(parts[0])
if err != nil {
return []string{}, fmt.Errorf("invalid local port %s: %w", parts[0], err)
return []string{}, NewConfigInvalidError(fmt.Sprintf("invalid local port %s", parts[0]), err)
}

dstPort := srcPort
if len(parts) > 1 {
dstPort, err = strconv.Atoi(parts[1])
if err != nil {
return []string{}, fmt.Errorf("invalid remote port %s: %w", parts[1], err)
return []string{}, NewConfigInvalidError(fmt.Sprintf("invalid remote port %s", parts[1]), err)
}
}

sp, ok := lo.Find(svc.Spec.Ports, func(p corev1.ServicePort) bool {
return p.Port == int32(dstPort)
})
if !ok {
return []string{}, fmt.Errorf("service %s does not expose port %d", svc.Name, dstPort)
return []string{}, NewConfigInvalidError(fmt.Sprintf("service %s does not expose port %d", svc.Name, dstPort), nil)
}

if sp.TargetPort.Type == intstr.Int {
Expand All @@ -99,7 +122,7 @@ func (l *ServiceLocator) mapPorts(svc *corev1.Service, pod *corev1.Pod) ([]strin
return sp.TargetPort.StrVal == p.Name
})
if !ok {
return []string{}, fmt.Errorf("pod %s does not have named port %s", pod.Name, sp.TargetPort.StrVal)
return []string{}, NewConfigInvalidError(fmt.Sprintf("pod %s does not have named port %s", pod.Name, sp.TargetPort.StrVal), nil)
}

dstPort = int(pp.ContainerPort)
Expand Down