diff --git a/kubernetes/apis/sandbox/v1alpha1/batchsandbox_types.go b/kubernetes/apis/sandbox/v1alpha1/batchsandbox_types.go index edfab19b2..d79f01ee6 100644 --- a/kubernetes/apis/sandbox/v1alpha1/batchsandbox_types.go +++ b/kubernetes/apis/sandbox/v1alpha1/batchsandbox_types.go @@ -41,7 +41,7 @@ const ( ) // BatchSandboxConditionType represents the type of BatchSandbox condition. -// +kubebuilder:validation:Enum=Ready;Progressing;Paused;PauseFailed;ResumeFailed;PodFailed +// +kubebuilder:validation:Enum=Ready;Progressing;Paused;PauseFailed;ResumeFailed;PodFailed;InvalidShardPatch type BatchSandboxConditionType string const ( @@ -57,6 +57,9 @@ const ( BatchSandboxConditionResumeFailed BatchSandboxConditionType = "ResumeFailed" // BatchSandboxConditionPodFailed is set when the sandbox pod enters a failed state. BatchSandboxConditionPodFailed BatchSandboxConditionType = "PodFailed" + // BatchSandboxConditionInvalidShardPatch is set when a shardTaskPatch cannot be merged + // into TaskTemplateSpec due to a type mismatch or invalid JSON structure. + BatchSandboxConditionInvalidShardPatch BatchSandboxConditionType = "InvalidShardPatch" ) // BatchSandboxCondition represents a condition of a BatchSandbox diff --git a/kubernetes/config/crd/bases/sandbox.opensandbox.io_batchsandboxes.yaml b/kubernetes/config/crd/bases/sandbox.opensandbox.io_batchsandboxes.yaml index 0d305da04..940cb4e24 100644 --- a/kubernetes/config/crd/bases/sandbox.opensandbox.io_batchsandboxes.yaml +++ b/kubernetes/config/crd/bases/sandbox.opensandbox.io_batchsandboxes.yaml @@ -177,6 +177,7 @@ spec: - PauseFailed - ResumeFailed - PodFailed + - InvalidShardPatch type: string required: - status diff --git a/kubernetes/internal/controller/batchsandbox_controller.go b/kubernetes/internal/controller/batchsandbox_controller.go index 63e76c98a..b461f5df6 100644 --- a/kubernetes/internal/controller/batchsandbox_controller.go +++ b/kubernetes/internal/controller/batchsandbox_controller.go @@ -146,6 +146,33 @@ func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request // handle finalizers if batchSbx.DeletionTimestamp == nil { if taskStrategy.NeedTaskScheduling() { + // Validate shardTaskPatches early so that a type-mismatch is surfaced + // immediately as a status condition rather than causing a silent merge + // failure deep in the reconcile loop. + if patchErr := taskStrategy.ValidateShardTaskPatches(); patchErr != nil { + log.Error(patchErr, "invalid shardTaskPatches detected, updating status condition") + statusCopy := batchSbx.Status.DeepCopy() + setConditionInStatus(statusCopy, sandboxv1alpha1.BatchSandboxConditionInvalidShardPatch, + sandboxv1alpha1.ConditionTrue, "InvalidShardPatch", patchErr.Error()) + if statusErr := r.updateStatus(ctx, batchSbx, statusCopy); statusErr != nil { + log.Error(statusErr, "failed to update status with InvalidShardPatch condition") + return ctrl.Result{}, statusErr + } + // Return without error so the controller does not keep requeueing; + // the resource must be corrected by the user. + return ctrl.Result{}, nil + } + // Patches are valid — clear any stale InvalidShardPatch condition so users + // know the fix was accepted. + if hasCondition(batchSbx, sandboxv1alpha1.BatchSandboxConditionInvalidShardPatch) { + statusCopy := batchSbx.Status.DeepCopy() + setConditionInStatus(statusCopy, sandboxv1alpha1.BatchSandboxConditionInvalidShardPatch, + sandboxv1alpha1.ConditionFalse, "", "") + if statusErr := r.updateStatus(ctx, batchSbx, statusCopy); statusErr != nil { + log.Error(statusErr, "failed to clear InvalidShardPatch condition") + return ctrl.Result{}, statusErr + } + } if !controllerutil.ContainsFinalizer(batchSbx, FinalizerTaskCleanup) { err := utils.UpdateFinalizer(r.Client, batchSbx, utils.AddFinalizerOpType, FinalizerTaskCleanup) if err != nil { @@ -160,6 +187,32 @@ func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request if !taskStrategy.NeedTaskScheduling() { return ctrl.Result{}, nil } + // If shardTaskPatches are invalid we cannot build task specs, but we must + // not leave the resource stuck in Terminating. Clear the finalizer now and + // record the error in status so the deletion can proceed. + if patchErr := taskStrategy.ValidateShardTaskPatches(); patchErr != nil { + log.Error(patchErr, "invalid shardTaskPatches on deletion path, clearing finalizer to unblock termination") + statusCopy := batchSbx.Status.DeepCopy() + setConditionInStatus(statusCopy, sandboxv1alpha1.BatchSandboxConditionInvalidShardPatch, + sandboxv1alpha1.ConditionTrue, "InvalidShardPatch", patchErr.Error()) + if statusErr := r.updateStatus(ctx, batchSbx, statusCopy); statusErr != nil { + log.Error(statusErr, "failed to update status with InvalidShardPatch condition on deletion path") + return ctrl.Result{}, statusErr + } + // Always clean up the in-memory scheduler regardless of whether the finalizer + // is still present, so stale schedulers are not left behind. + r.deleteTaskScheduler(ctx, batchSbx) + if controllerutil.ContainsFinalizer(batchSbx, FinalizerTaskCleanup) { + if err := utils.UpdateFinalizer(r.Client, batchSbx, utils.RemoveFinalizerOpType, FinalizerTaskCleanup); err != nil { + if !errors.IsNotFound(err) { + log.Error(err, "failed to remove finalizer during invalid-patch deletion", "finalizer", FinalizerTaskCleanup) + return ctrl.Result{}, err + } + } + log.Info("removed finalizer due to invalid shardTaskPatches, resource can now be deleted") + } + return ctrl.Result{}, nil + } } // Pause/Resume dispatch: handles pause/resume intent before normal scaling. diff --git a/kubernetes/internal/controller/batchsandbox_pause_resume_test.go b/kubernetes/internal/controller/batchsandbox_pause_resume_test.go index 29f793002..1234eabad 100644 --- a/kubernetes/internal/controller/batchsandbox_pause_resume_test.go +++ b/kubernetes/internal/controller/batchsandbox_pause_resume_test.go @@ -26,6 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/utils/ptr" @@ -33,6 +34,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/client/interceptor" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" sandboxv1alpha1 "github.com/alibaba/OpenSandbox/sandbox-k8s/apis/sandbox/v1alpha1" taskscheduler "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/scheduler" @@ -2724,3 +2726,123 @@ func TestAckPauseWithPhase_DoesNotMutateSpecPause(t *testing.T) { // Ensure ctrl.Result type is used var _ = ctrl.Result{} + +// ---------- Issue 1019: shardTaskPatches validation and finalizer unblock ---------- + +// TestReconcile_InvalidShardTaskPatches_SetsConditionAndDoesNotProceed verifies that when +// a BatchSandbox has invalid shardTaskPatches (e.g. args as an integer instead of a string +// array), the reconciler surfaces an InvalidShardPatch condition in status and returns +// without adding a finalizer or progressing further. +func TestReconcile_InvalidShardTaskPatches_SetsConditionAndDoesNotProceed(t *testing.T) { + bs := &sandboxv1alpha1.BatchSandbox{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-bs-invalid-patch", + Namespace: "default", + }, + Spec: sandboxv1alpha1.BatchSandboxSpec{ + Replicas: ptr.To(int32(1)), + Template: &corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "main", Image: "img"}}, + }, + }, + TaskTemplate: &sandboxv1alpha1.TaskTemplateSpec{ + Spec: sandboxv1alpha1.TaskSpec{ + Process: &sandboxv1alpha1.ProcessTask{ + Command: []string{"sleep"}, + Args: []string{"infinite"}, + }, + }, + }, + // args is an integer, not a string array — invalid against TaskSpec schema. + ShardTaskPatches: []runtime.RawExtension{ + {Raw: []byte(`{"spec":{"process":{"args":3600}}}`)}, + }, + }, + } + r := newTestReconciler(bs) + + result, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Namespace: "default", Name: "test-bs-invalid-patch"}, + }) + require.NoError(t, err, "reconcile should not return an error for invalid user input") + assert.Equal(t, ctrl.Result{}, result, "reconcile should return an empty result (no requeue)") + + // Verify InvalidShardPatch condition is set in status. + updated := &sandboxv1alpha1.BatchSandbox{} + require.NoError(t, r.Get(context.Background(), types.NamespacedName{Namespace: "default", Name: "test-bs-invalid-patch"}, updated)) + + var invalidPatchCond *sandboxv1alpha1.BatchSandboxCondition + for i := range updated.Status.Conditions { + if updated.Status.Conditions[i].Type == sandboxv1alpha1.BatchSandboxConditionInvalidShardPatch { + invalidPatchCond = &updated.Status.Conditions[i] + break + } + } + require.NotNil(t, invalidPatchCond, "InvalidShardPatch condition should be set") + assert.Equal(t, sandboxv1alpha1.ConditionTrue, invalidPatchCond.Status) + assert.Equal(t, "InvalidShardPatch", invalidPatchCond.Reason) + + // Verify that the finalizer was NOT added (reconcile should have bailed before that). + assert.False(t, controllerutil.ContainsFinalizer(updated, FinalizerTaskCleanup), + "finalizer should not have been added when shardTaskPatches are invalid") +} + +// TestReconcile_InvalidShardTaskPatches_OnDeletion_ClearsFinalizer verifies that when a +// BatchSandbox with an invalid shardTaskPatch is being deleted, the reconciler clears the +// finalizer so the resource is not stuck in Terminating forever. +func TestReconcile_InvalidShardTaskPatches_OnDeletion_ClearsFinalizer(t *testing.T) { + now := metav1.Now() + bs := &sandboxv1alpha1.BatchSandbox{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-bs-delete-invalid", + Namespace: "default", + DeletionTimestamp: &now, + // fake client requires non-zero ResourceVersion for objects with DeletionTimestamp. + ResourceVersion: "1", + Finalizers: []string{FinalizerTaskCleanup}, + }, + Spec: sandboxv1alpha1.BatchSandboxSpec{ + Replicas: ptr.To(int32(1)), + Template: &corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{Name: "main", Image: "img"}}, + }, + }, + TaskTemplate: &sandboxv1alpha1.TaskTemplateSpec{ + Spec: sandboxv1alpha1.TaskSpec{ + Process: &sandboxv1alpha1.ProcessTask{ + Command: []string{"sleep"}, + Args: []string{"infinite"}, + }, + }, + }, + // args is an integer, not a string array — invalid against TaskSpec schema. + ShardTaskPatches: []runtime.RawExtension{ + {Raw: []byte(`{"spec":{"process":{"args":3600}}}`)}, + }, + }, + } + r := newTestReconciler(bs) + + result, err := r.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: types.NamespacedName{Namespace: "default", Name: "test-bs-delete-invalid"}, + }) + require.NoError(t, err, "reconcile should not return an error on deletion with invalid patches") + assert.Equal(t, ctrl.Result{}, result) + + // After the finalizer is removed from a terminating object, the fake client + // garbage-collects the resource immediately (same behaviour as the real API server). + // A "not found" response confirms the finalizer was cleared and deletion unblocked. + updated := &sandboxv1alpha1.BatchSandbox{} + getErr := r.Get(context.Background(), types.NamespacedName{Namespace: "default", Name: "test-bs-delete-invalid"}, updated) + if getErr == nil { + // Resource still visible (e.g. fake client version differences) — confirm finalizer gone. + assert.False(t, controllerutil.ContainsFinalizer(updated, FinalizerTaskCleanup), + "finalizer should have been removed so the resource is no longer stuck in Terminating") + } else { + // Resource already garbage-collected — this is the expected happy path. + require.True(t, apierrors.IsNotFound(getErr), + "expected resource to be deleted after finalizer removal, got unexpected error: %v", getErr) + } +} diff --git a/kubernetes/internal/controller/batchsandbox_status.go b/kubernetes/internal/controller/batchsandbox_status.go index 0427df935..4a0cfa994 100644 --- a/kubernetes/internal/controller/batchsandbox_status.go +++ b/kubernetes/internal/controller/batchsandbox_status.go @@ -38,6 +38,16 @@ type runtimeView struct { resumeCompleted bool } +// hasCondition returns true when the BatchSandbox has a condition of the given type set to True. +func hasCondition(sbx *sandboxv1alpha1.BatchSandbox, condType sandboxv1alpha1.BatchSandboxConditionType) bool { + for _, c := range sbx.Status.Conditions { + if c.Type == condType && c.Status == sandboxv1alpha1.ConditionTrue { + return true + } + } + return false +} + func setConditionInStatus( status *sandboxv1alpha1.BatchSandboxStatus, conditionType sandboxv1alpha1.BatchSandboxConditionType, diff --git a/kubernetes/internal/controller/strategy/task_scheduling_strategy.go b/kubernetes/internal/controller/strategy/task_scheduling_strategy.go index d973fd787..bae576eda 100644 --- a/kubernetes/internal/controller/strategy/task_scheduling_strategy.go +++ b/kubernetes/internal/controller/strategy/task_scheduling_strategy.go @@ -25,6 +25,10 @@ type TaskSchedulingStrategy interface { // NeedTaskScheduling determines whether the BatchSandbox requires task scheduling. NeedTaskScheduling() bool + // ValidateShardTaskPatches checks that all shardTaskPatches can be merged into + // a TaskTemplateSpec without type errors. Returns nil when all patches are valid. + ValidateShardTaskPatches() error + // GenerateTaskSpecs generates the complete list of task specifications for the BatchSandbox. GenerateTaskSpecs() ([]*api.Task, error) } diff --git a/kubernetes/internal/controller/strategy/task_scheduling_strategy_default.go b/kubernetes/internal/controller/strategy/task_scheduling_strategy_default.go index 4cf02b63d..f7a6ddbd9 100644 --- a/kubernetes/internal/controller/strategy/task_scheduling_strategy_default.go +++ b/kubernetes/internal/controller/strategy/task_scheduling_strategy_default.go @@ -41,6 +41,36 @@ func (s *DefaultTaskSchedulingStrategy) NeedTaskScheduling() bool { return s.Spec.TaskTemplate != nil } +// ValidateShardTaskPatches checks that every shardTaskPatch can be successfully +// merged into a zero-value TaskTemplateSpec. It returns the first error encountered, +// including the patch index and the raw patch bytes to aid diagnosis. +func (s *DefaultTaskSchedulingStrategy) ValidateShardTaskPatches() error { + if len(s.Spec.ShardTaskPatches) == 0 { + return nil + } + // Zero-value base: we check structural correctness in isolation, not against any + // specific user template, so a bad patch is caught even when TaskTemplate is nil. + zeroBytes, err := json.Marshal(&sandboxv1alpha1.TaskTemplateSpec{}) + if err != nil { + return fmt.Errorf("batchsandbox: failed to marshal zero TaskTemplateSpec: %w", err) + } + for i, patch := range s.Spec.ShardTaskPatches { + // Truncate patch in error messages to avoid persisting large blobs in status conditions. + patchSummary := patch.Raw + if len(patchSummary) > 200 { + patchSummary = append(patchSummary[:200], []byte("...(truncated)")...) + } + modified, mergeErr := strategicpatch.StrategicMergePatch(zeroBytes, patch.Raw, &sandboxv1alpha1.TaskTemplateSpec{}) + if mergeErr != nil { + return fmt.Errorf("batchsandbox: shardTaskPatches[%d] failed schema validation: patch %s, err %w", i, patchSummary, mergeErr) + } + if err = json.Unmarshal(modified, &sandboxv1alpha1.TaskTemplateSpec{}); err != nil { + return fmt.Errorf("batchsandbox: shardTaskPatches[%d] produced invalid TaskTemplateSpec: patch %s, err %w", i, patchSummary, err) + } + } + return nil +} + // GenerateTaskSpecs generates task specifications for all replicas. func (s *DefaultTaskSchedulingStrategy) GenerateTaskSpecs() ([]*api.Task, error) { ret := make([]*api.Task, *s.Spec.Replicas) diff --git a/kubernetes/internal/controller/strategy/task_scheduling_strategy_default_test.go b/kubernetes/internal/controller/strategy/task_scheduling_strategy_default_test.go index d9c8aa089..a42f0dd03 100644 --- a/kubernetes/internal/controller/strategy/task_scheduling_strategy_default_test.go +++ b/kubernetes/internal/controller/strategy/task_scheduling_strategy_default_test.go @@ -25,6 +25,82 @@ import ( api "github.com/alibaba/OpenSandbox/sandbox-k8s/pkg/task-executor" ) +func TestDefaultTaskSchedulingStrategy_ValidateShardTaskPatches(t *testing.T) { + tests := []struct { + name string + batchSbx *sandboxv1alpha1.BatchSandbox + wantErr bool + }{ + { + name: "no shard task patches - valid", + batchSbx: &sandboxv1alpha1.BatchSandbox{ + Spec: sandboxv1alpha1.BatchSandboxSpec{ + TaskTemplate: &sandboxv1alpha1.TaskTemplateSpec{}, + }, + }, + wantErr: false, + }, + { + name: "valid shard task patch with string args", + batchSbx: &sandboxv1alpha1.BatchSandbox{ + Spec: sandboxv1alpha1.BatchSandboxSpec{ + TaskTemplate: &sandboxv1alpha1.TaskTemplateSpec{}, + ShardTaskPatches: []runtime.RawExtension{ + {Raw: []byte(`{"spec":{"process":{"command":["sleep"],"args":["3600"]}}}`)}, + }, + }, + }, + wantErr: false, + }, + { + name: "invalid shard task patch - args as integer instead of string array", + batchSbx: &sandboxv1alpha1.BatchSandbox{ + Spec: sandboxv1alpha1.BatchSandboxSpec{ + TaskTemplate: &sandboxv1alpha1.TaskTemplateSpec{}, + ShardTaskPatches: []runtime.RawExtension{ + {Raw: []byte(`{"spec":{"process":{"args":3600}}}`)}, + }, + }, + }, + wantErr: true, + }, + { + name: "invalid shard task patch - malformed JSON", + batchSbx: &sandboxv1alpha1.BatchSandbox{ + Spec: sandboxv1alpha1.BatchSandboxSpec{ + TaskTemplate: &sandboxv1alpha1.TaskTemplateSpec{}, + ShardTaskPatches: []runtime.RawExtension{ + {Raw: []byte(`{"invalid json`)}, + }, + }, + }, + wantErr: true, + }, + { + name: "second patch invalid - returns error with correct index", + batchSbx: &sandboxv1alpha1.BatchSandbox{ + Spec: sandboxv1alpha1.BatchSandboxSpec{ + TaskTemplate: &sandboxv1alpha1.TaskTemplateSpec{}, + ShardTaskPatches: []runtime.RawExtension{ + {Raw: []byte(`{"spec":{"process":{"args":["valid"]}}}`)}, + {Raw: []byte(`{"spec":{"process":{"args":3600}}}`)}, + }, + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := NewDefaultTaskSchedulingStrategy(tt.batchSbx) + err := s.ValidateShardTaskPatches() + if (err != nil) != tt.wantErr { + t.Errorf("ValidateShardTaskPatches() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + func TestDefaultTaskSchedulingStrategy_NeedTaskScheduling(t *testing.T) { tests := []struct { name string