Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion kubernetes/apis/sandbox/v1alpha1/batchsandbox_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const (
)

// BatchSandboxConditionType represents the type of BatchSandbox condition.
// +kubebuilder:validation:Enum=Ready;Progressing;Paused;PauseFailed;ResumeFailed;PodFailed
// +kubebuilder:validation:Enum=Ready;Progressing;Paused;PauseFailed;ResumeFailed;PodFailed;InvalidShardPatch
Comment thread
ashishpatel26 marked this conversation as resolved.
type BatchSandboxConditionType string

const (
Expand All @@ -57,6 +57,9 @@ const (
BatchSandboxConditionResumeFailed BatchSandboxConditionType = "ResumeFailed"
// BatchSandboxConditionPodFailed is set when the sandbox pod enters a failed state.
BatchSandboxConditionPodFailed BatchSandboxConditionType = "PodFailed"
// BatchSandboxConditionInvalidShardPatch is set when a shardTaskPatch cannot be merged
// into TaskTemplateSpec due to a type mismatch or invalid JSON structure.
BatchSandboxConditionInvalidShardPatch BatchSandboxConditionType = "InvalidShardPatch"
)

// BatchSandboxCondition represents a condition of a BatchSandbox
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ spec:
- PauseFailed
- ResumeFailed
- PodFailed
- InvalidShardPatch
type: string
required:
- status
Expand Down
53 changes: 53 additions & 0 deletions kubernetes/internal/controller/batchsandbox_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,33 @@ func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request
// handle finalizers
if batchSbx.DeletionTimestamp == nil {
if taskStrategy.NeedTaskScheduling() {
// Validate shardTaskPatches early so that a type-mismatch is surfaced
// immediately as a status condition rather than causing a silent merge
// failure deep in the reconcile loop.
if patchErr := taskStrategy.ValidateShardTaskPatches(); patchErr != nil {
Comment thread
ashishpatel26 marked this conversation as resolved.
log.Error(patchErr, "invalid shardTaskPatches detected, updating status condition")
statusCopy := batchSbx.Status.DeepCopy()
setConditionInStatus(statusCopy, sandboxv1alpha1.BatchSandboxConditionInvalidShardPatch,
sandboxv1alpha1.ConditionTrue, "InvalidShardPatch", patchErr.Error())
if statusErr := r.updateStatus(ctx, batchSbx, statusCopy); statusErr != nil {
log.Error(statusErr, "failed to update status with InvalidShardPatch condition")
return ctrl.Result{}, statusErr
}
// Return without error so the controller does not keep requeueing;
// the resource must be corrected by the user.
return ctrl.Result{}, nil
}
Comment thread
ashishpatel26 marked this conversation as resolved.
// Patches are valid — clear any stale InvalidShardPatch condition so users
// know the fix was accepted.
if hasCondition(batchSbx, sandboxv1alpha1.BatchSandboxConditionInvalidShardPatch) {
statusCopy := batchSbx.Status.DeepCopy()
setConditionInStatus(statusCopy, sandboxv1alpha1.BatchSandboxConditionInvalidShardPatch,
sandboxv1alpha1.ConditionFalse, "", "")
if statusErr := r.updateStatus(ctx, batchSbx, statusCopy); statusErr != nil {
log.Error(statusErr, "failed to clear InvalidShardPatch condition")
return ctrl.Result{}, statusErr
}
}
if !controllerutil.ContainsFinalizer(batchSbx, FinalizerTaskCleanup) {
err := utils.UpdateFinalizer(r.Client, batchSbx, utils.AddFinalizerOpType, FinalizerTaskCleanup)
if err != nil {
Expand All @@ -160,6 +187,32 @@ func (r *BatchSandboxReconciler) Reconcile(ctx context.Context, req ctrl.Request
if !taskStrategy.NeedTaskScheduling() {
return ctrl.Result{}, nil
}
// If shardTaskPatches are invalid we cannot build task specs, but we must
// not leave the resource stuck in Terminating. Clear the finalizer now and
// record the error in status so the deletion can proceed.
if patchErr := taskStrategy.ValidateShardTaskPatches(); patchErr != nil {
log.Error(patchErr, "invalid shardTaskPatches on deletion path, clearing finalizer to unblock termination")
statusCopy := batchSbx.Status.DeepCopy()
setConditionInStatus(statusCopy, sandboxv1alpha1.BatchSandboxConditionInvalidShardPatch,
sandboxv1alpha1.ConditionTrue, "InvalidShardPatch", patchErr.Error())
if statusErr := r.updateStatus(ctx, batchSbx, statusCopy); statusErr != nil {
log.Error(statusErr, "failed to update status with InvalidShardPatch condition on deletion path")
return ctrl.Result{}, statusErr
}
// Always clean up the in-memory scheduler regardless of whether the finalizer
// is still present, so stale schedulers are not left behind.
r.deleteTaskScheduler(ctx, batchSbx)
if controllerutil.ContainsFinalizer(batchSbx, FinalizerTaskCleanup) {
Comment thread
ashishpatel26 marked this conversation as resolved.
if err := utils.UpdateFinalizer(r.Client, batchSbx, utils.RemoveFinalizerOpType, FinalizerTaskCleanup); err != nil {
if !errors.IsNotFound(err) {
log.Error(err, "failed to remove finalizer during invalid-patch deletion", "finalizer", FinalizerTaskCleanup)
return ctrl.Result{}, err
}
}
log.Info("removed finalizer due to invalid shardTaskPatches, resource can now be deleted")
}
return ctrl.Result{}, nil
}
}

// Pause/Resume dispatch: handles pause/resume intent before normal scaling.
Expand Down
122 changes: 122 additions & 0 deletions kubernetes/internal/controller/batchsandbox_pause_resume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

sandboxv1alpha1 "github.com/alibaba/OpenSandbox/sandbox-k8s/apis/sandbox/v1alpha1"
taskscheduler "github.com/alibaba/OpenSandbox/sandbox-k8s/internal/scheduler"
Expand Down Expand Up @@ -2724,3 +2726,123 @@ func TestAckPauseWithPhase_DoesNotMutateSpecPause(t *testing.T) {

// Ensure ctrl.Result type is used
var _ = ctrl.Result{}

// ---------- Issue 1019: shardTaskPatches validation and finalizer unblock ----------

// TestReconcile_InvalidShardTaskPatches_SetsConditionAndDoesNotProceed verifies that when
// a BatchSandbox has invalid shardTaskPatches (e.g. args as an integer instead of a string
// array), the reconciler surfaces an InvalidShardPatch condition in status and returns
// without adding a finalizer or progressing further.
func TestReconcile_InvalidShardTaskPatches_SetsConditionAndDoesNotProceed(t *testing.T) {
bs := &sandboxv1alpha1.BatchSandbox{
ObjectMeta: metav1.ObjectMeta{
Name: "test-bs-invalid-patch",
Namespace: "default",
},
Spec: sandboxv1alpha1.BatchSandboxSpec{
Replicas: ptr.To(int32(1)),
Template: &corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "main", Image: "img"}},
},
},
TaskTemplate: &sandboxv1alpha1.TaskTemplateSpec{
Spec: sandboxv1alpha1.TaskSpec{
Process: &sandboxv1alpha1.ProcessTask{
Command: []string{"sleep"},
Args: []string{"infinite"},
},
},
},
// args is an integer, not a string array — invalid against TaskSpec schema.
ShardTaskPatches: []runtime.RawExtension{
{Raw: []byte(`{"spec":{"process":{"args":3600}}}`)},
},
},
}
r := newTestReconciler(bs)

result, err := r.Reconcile(context.Background(), ctrl.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: "test-bs-invalid-patch"},
})
require.NoError(t, err, "reconcile should not return an error for invalid user input")
assert.Equal(t, ctrl.Result{}, result, "reconcile should return an empty result (no requeue)")

// Verify InvalidShardPatch condition is set in status.
updated := &sandboxv1alpha1.BatchSandbox{}
require.NoError(t, r.Get(context.Background(), types.NamespacedName{Namespace: "default", Name: "test-bs-invalid-patch"}, updated))

var invalidPatchCond *sandboxv1alpha1.BatchSandboxCondition
for i := range updated.Status.Conditions {
if updated.Status.Conditions[i].Type == sandboxv1alpha1.BatchSandboxConditionInvalidShardPatch {
invalidPatchCond = &updated.Status.Conditions[i]
break
}
}
require.NotNil(t, invalidPatchCond, "InvalidShardPatch condition should be set")
assert.Equal(t, sandboxv1alpha1.ConditionTrue, invalidPatchCond.Status)
assert.Equal(t, "InvalidShardPatch", invalidPatchCond.Reason)

// Verify that the finalizer was NOT added (reconcile should have bailed before that).
assert.False(t, controllerutil.ContainsFinalizer(updated, FinalizerTaskCleanup),
"finalizer should not have been added when shardTaskPatches are invalid")
}

// TestReconcile_InvalidShardTaskPatches_OnDeletion_ClearsFinalizer verifies that when a
// BatchSandbox with an invalid shardTaskPatch is being deleted, the reconciler clears the
// finalizer so the resource is not stuck in Terminating forever.
func TestReconcile_InvalidShardTaskPatches_OnDeletion_ClearsFinalizer(t *testing.T) {
now := metav1.Now()
bs := &sandboxv1alpha1.BatchSandbox{
ObjectMeta: metav1.ObjectMeta{
Name: "test-bs-delete-invalid",
Namespace: "default",
DeletionTimestamp: &now,
// fake client requires non-zero ResourceVersion for objects with DeletionTimestamp.
ResourceVersion: "1",
Finalizers: []string{FinalizerTaskCleanup},
},
Spec: sandboxv1alpha1.BatchSandboxSpec{
Replicas: ptr.To(int32(1)),
Template: &corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{{Name: "main", Image: "img"}},
},
},
TaskTemplate: &sandboxv1alpha1.TaskTemplateSpec{
Spec: sandboxv1alpha1.TaskSpec{
Process: &sandboxv1alpha1.ProcessTask{
Command: []string{"sleep"},
Args: []string{"infinite"},
},
},
},
// args is an integer, not a string array — invalid against TaskSpec schema.
ShardTaskPatches: []runtime.RawExtension{
{Raw: []byte(`{"spec":{"process":{"args":3600}}}`)},
},
},
}
r := newTestReconciler(bs)

result, err := r.Reconcile(context.Background(), ctrl.Request{
NamespacedName: types.NamespacedName{Namespace: "default", Name: "test-bs-delete-invalid"},
})
require.NoError(t, err, "reconcile should not return an error on deletion with invalid patches")
assert.Equal(t, ctrl.Result{}, result)

// After the finalizer is removed from a terminating object, the fake client
// garbage-collects the resource immediately (same behaviour as the real API server).
// A "not found" response confirms the finalizer was cleared and deletion unblocked.
updated := &sandboxv1alpha1.BatchSandbox{}
getErr := r.Get(context.Background(), types.NamespacedName{Namespace: "default", Name: "test-bs-delete-invalid"}, updated)
if getErr == nil {
// Resource still visible (e.g. fake client version differences) — confirm finalizer gone.
assert.False(t, controllerutil.ContainsFinalizer(updated, FinalizerTaskCleanup),
"finalizer should have been removed so the resource is no longer stuck in Terminating")
} else {
// Resource already garbage-collected — this is the expected happy path.
require.True(t, apierrors.IsNotFound(getErr),
"expected resource to be deleted after finalizer removal, got unexpected error: %v", getErr)
}
}
10 changes: 10 additions & 0 deletions kubernetes/internal/controller/batchsandbox_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ type runtimeView struct {
resumeCompleted bool
}

// hasCondition returns true when the BatchSandbox has a condition of the given type set to True.
func hasCondition(sbx *sandboxv1alpha1.BatchSandbox, condType sandboxv1alpha1.BatchSandboxConditionType) bool {
for _, c := range sbx.Status.Conditions {
if c.Type == condType && c.Status == sandboxv1alpha1.ConditionTrue {
return true
}
}
return false
}

func setConditionInStatus(
status *sandboxv1alpha1.BatchSandboxStatus,
conditionType sandboxv1alpha1.BatchSandboxConditionType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type TaskSchedulingStrategy interface {
// NeedTaskScheduling determines whether the BatchSandbox requires task scheduling.
NeedTaskScheduling() bool

// ValidateShardTaskPatches checks that all shardTaskPatches can be merged into
// a TaskTemplateSpec without type errors. Returns nil when all patches are valid.
ValidateShardTaskPatches() error

// GenerateTaskSpecs generates the complete list of task specifications for the BatchSandbox.
GenerateTaskSpecs() ([]*api.Task, error)
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,36 @@ func (s *DefaultTaskSchedulingStrategy) NeedTaskScheduling() bool {
return s.Spec.TaskTemplate != nil
}

// ValidateShardTaskPatches checks that every shardTaskPatch can be successfully
// merged into a zero-value TaskTemplateSpec. It returns the first error encountered,
// including the patch index and the raw patch bytes to aid diagnosis.
func (s *DefaultTaskSchedulingStrategy) ValidateShardTaskPatches() error {
if len(s.Spec.ShardTaskPatches) == 0 {
return nil
}
// Zero-value base: we check structural correctness in isolation, not against any
// specific user template, so a bad patch is caught even when TaskTemplate is nil.
zeroBytes, err := json.Marshal(&sandboxv1alpha1.TaskTemplateSpec{})
if err != nil {
return fmt.Errorf("batchsandbox: failed to marshal zero TaskTemplateSpec: %w", err)
}
for i, patch := range s.Spec.ShardTaskPatches {
Comment thread
ashishpatel26 marked this conversation as resolved.
// Truncate patch in error messages to avoid persisting large blobs in status conditions.
patchSummary := patch.Raw
if len(patchSummary) > 200 {
patchSummary = append(patchSummary[:200], []byte("...(truncated)")...)
Comment on lines +59 to +61

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid mutating shardTaskPatches while truncating diagnostics

For any patch whose raw JSON is longer than about 214 bytes, patchSummary still shares the backing array with patch.Raw, so this append(patchSummary[:200], ...) overwrites bytes 200 onward in the actual patch before StrategicMergePatch validates it. A valid long shardTaskPatches entry can therefore be rejected as malformed or, worse, used later in the same reconcile with altered command/env contents; make the summary from a copied slice or string instead of slicing the live patch bytes.

Useful? React with 👍 / 👎.

}
modified, mergeErr := strategicpatch.StrategicMergePatch(zeroBytes, patch.Raw, &sandboxv1alpha1.TaskTemplateSpec{})
if mergeErr != nil {
return fmt.Errorf("batchsandbox: shardTaskPatches[%d] failed schema validation: patch %s, err %w", i, patchSummary, mergeErr)
}
if err = json.Unmarshal(modified, &sandboxv1alpha1.TaskTemplateSpec{}); err != nil {
return fmt.Errorf("batchsandbox: shardTaskPatches[%d] produced invalid TaskTemplateSpec: patch %s, err %w", i, patchSummary, err)
}
}
return nil
}

// GenerateTaskSpecs generates task specifications for all replicas.
func (s *DefaultTaskSchedulingStrategy) GenerateTaskSpecs() ([]*api.Task, error) {
ret := make([]*api.Task, *s.Spec.Replicas)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,82 @@ import (
api "github.com/alibaba/OpenSandbox/sandbox-k8s/pkg/task-executor"
)

func TestDefaultTaskSchedulingStrategy_ValidateShardTaskPatches(t *testing.T) {
tests := []struct {
name string
batchSbx *sandboxv1alpha1.BatchSandbox
wantErr bool
}{
{
name: "no shard task patches - valid",
batchSbx: &sandboxv1alpha1.BatchSandbox{
Spec: sandboxv1alpha1.BatchSandboxSpec{
TaskTemplate: &sandboxv1alpha1.TaskTemplateSpec{},
},
},
wantErr: false,
},
{
name: "valid shard task patch with string args",
batchSbx: &sandboxv1alpha1.BatchSandbox{
Spec: sandboxv1alpha1.BatchSandboxSpec{
TaskTemplate: &sandboxv1alpha1.TaskTemplateSpec{},
ShardTaskPatches: []runtime.RawExtension{
{Raw: []byte(`{"spec":{"process":{"command":["sleep"],"args":["3600"]}}}`)},
},
},
},
wantErr: false,
},
{
name: "invalid shard task patch - args as integer instead of string array",
batchSbx: &sandboxv1alpha1.BatchSandbox{
Spec: sandboxv1alpha1.BatchSandboxSpec{
TaskTemplate: &sandboxv1alpha1.TaskTemplateSpec{},
ShardTaskPatches: []runtime.RawExtension{
{Raw: []byte(`{"spec":{"process":{"args":3600}}}`)},
},
},
},
wantErr: true,
},
{
name: "invalid shard task patch - malformed JSON",
batchSbx: &sandboxv1alpha1.BatchSandbox{
Spec: sandboxv1alpha1.BatchSandboxSpec{
TaskTemplate: &sandboxv1alpha1.TaskTemplateSpec{},
ShardTaskPatches: []runtime.RawExtension{
{Raw: []byte(`{"invalid json`)},
},
},
},
wantErr: true,
},
{
name: "second patch invalid - returns error with correct index",
batchSbx: &sandboxv1alpha1.BatchSandbox{
Spec: sandboxv1alpha1.BatchSandboxSpec{
TaskTemplate: &sandboxv1alpha1.TaskTemplateSpec{},
ShardTaskPatches: []runtime.RawExtension{
{Raw: []byte(`{"spec":{"process":{"args":["valid"]}}}`)},
{Raw: []byte(`{"spec":{"process":{"args":3600}}}`)},
},
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := NewDefaultTaskSchedulingStrategy(tt.batchSbx)
err := s.ValidateShardTaskPatches()
if (err != nil) != tt.wantErr {
t.Errorf("ValidateShardTaskPatches() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
Comment thread
ashishpatel26 marked this conversation as resolved.
}

func TestDefaultTaskSchedulingStrategy_NeedTaskScheduling(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading