]> git.feebdaed.xyz Git - 0xmirror/kubernetes.git/commitdiff
schedule pod availability checks at the correct time in StatefulSets (#135428)
authorFilip Křepinský <atiratree@gmail.com>
Thu, 18 Dec 2025 06:35:21 +0000 (07:35 +0100)
committerGitHub <noreply@github.com>
Thu, 18 Dec 2025 06:35:21 +0000 (22:35 -0800)
* wire now (time) to the availability checks in the StatefulSet controller

- this helps to make the controller reconcilliation consistent

* schedule pod availability checks at the correct time in StatefulSets

* replace "k8s.io/klog/v2/ktesting" with "k8s.io/kubernetes/test/utils/ktesting"

for advanced features (e.g. Eventually)

* add StatefulSetAvailabilityCheck test

pkg/controller/statefulset/stateful_set.go
pkg/controller/statefulset/stateful_set_control.go
pkg/controller/statefulset/stateful_set_control_test.go
pkg/controller/statefulset/stateful_set_test.go
pkg/controller/statefulset/stateful_set_utils.go

index 257396f280973dba953760ac41d55828f14fc77f..922df499c1cc8963b94980ad1a3e71af3870aeb0 100644 (file)
@@ -44,6 +44,8 @@ import (
        "k8s.io/kubernetes/pkg/controller"
        "k8s.io/kubernetes/pkg/controller/history"
        "k8s.io/kubernetes/pkg/controller/statefulset/metrics"
+       "k8s.io/utils/clock"
+       "k8s.io/utils/ptr"
 
        "k8s.io/klog/v2"
 )
@@ -81,6 +83,7 @@ type StatefulSetController struct {
        queue workqueue.TypedRateLimitingInterface[string]
        // eventBroadcaster is the core of event processing pipeline.
        eventBroadcaster record.EventBroadcaster
+       clock            clock.PassiveClock
 }
 
 // NewStatefulSetController creates a new statefulset controller.
@@ -118,6 +121,7 @@ func NewStatefulSetController(
                podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},
 
                eventBroadcaster: eventBroadcaster,
+               clock:            clock.RealClock{},
        }
 
        podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@@ -266,9 +270,10 @@ func (ssc *StatefulSetController) updatePod(logger klog.Logger, old, cur interfa
                // having its status updated with the newly available replica.
                if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && set.Spec.MinReadySeconds > 0 {
                        logger.V(2).Info("StatefulSet will be enqueued after minReadySeconds for availability check", "statefulSet", klog.KObj(set), "minReadySeconds", set.Spec.MinReadySeconds)
-                       // Add a second to avoid milliseconds skew in AddAfter.
-                       // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
-                       ssc.enqueueSSAfter(logger, set, (time.Duration(set.Spec.MinReadySeconds)*time.Second)+time.Second)
+                       // If there are multiple pods with varying readiness times, we cannot correctly track it
+                       // with the current queue. Further resyncs are attempted at the end of the syncStatefulSet
+                       // function.
+                       ssc.enqueueSSAfter(logger, set, time.Duration(set.Spec.MinReadySeconds)*time.Second)
                }
                return
        }
@@ -463,7 +468,7 @@ func (ssc *StatefulSetController) worker(ctx context.Context) {
 
 // sync syncs the given statefulset.
 func (ssc *StatefulSetController) sync(ctx context.Context, key string) error {
-       startTime := time.Now()
+       startTime := ssc.clock.Now()
        logger := klog.FromContext(ctx)
        defer func() {
                logger.V(4).Info("Finished syncing statefulset", "key", key, "time", time.Since(startTime))
@@ -507,16 +512,27 @@ func (ssc *StatefulSetController) syncStatefulSet(ctx context.Context, set *apps
        logger := klog.FromContext(ctx)
        logger.V(4).Info("Syncing StatefulSet with pods", "statefulSet", klog.KObj(set), "pods", len(pods))
        var status *apps.StatefulSetStatus
+       var nextSyncDuration *time.Duration
        var err error
-       status, err = ssc.control.UpdateStatefulSet(ctx, set, pods)
+       // Use the same time for calculating status and nextSyncDuration.
+       now := ssc.clock.Now()
+       status, err = ssc.control.UpdateStatefulSet(ctx, set, pods, now)
        if err != nil {
                return err
        }
        logger.V(4).Info("Successfully synced StatefulSet", "statefulSet", klog.KObj(set))
-       // One more sync to handle the clock skew. This is also helping in requeuing right after status update
-       if set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas {
-               ssc.enqueueSSAfter(logger, set, time.Duration(set.Spec.MinReadySeconds)*time.Second)
+       // Plan the next availability check as a last line of defense against queue preemption (we have one queue key for checking availability of all the pods)
+       // or early sync (see https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info).
+       if set.Spec.MinReadySeconds > 0 && status != nil && status.ReadyReplicas != status.AvailableReplicas {
+               // Safeguard fallback to the .spec.minReadySeconds to ensure that we always end up with .status.availableReplicas updated.
+               nextSyncDuration = ptr.To(time.Duration(set.Spec.MinReadySeconds) * time.Second)
+               // Use the same point in time (now) for calculating status and nextSyncDuration to get matching availability for the pods.
+               if nextCheck := controller.FindMinNextPodAvailabilityCheck(pods, set.Spec.MinReadySeconds, now, ssc.clock); nextCheck != nil {
+                       nextSyncDuration = nextCheck
+               }
+       }
+       if nextSyncDuration != nil {
+               ssc.enqueueSSAfter(logger, set, *nextSyncDuration)
        }
-
        return nil
 }
index 888683457c3b382fa8a1eebdeb620ff5bb4ff73c..dd339cc7814fc47ed763f260ffaa77258e52e1cc 100644 (file)
@@ -20,6 +20,7 @@ import (
        "context"
        "sort"
        "sync"
+       "time"
 
        "k8s.io/klog/v2"
        "k8s.io/utils/lru"
@@ -48,7 +49,7 @@ type StatefulSetControlInterface interface {
        // If an implementation returns a non-nil error, the invocation will be retried using a rate-limited strategy.
        // Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to
        // exit exceptionally at any point provided they wish the update to be re-run at a later point in time.
-       UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error)
+       UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, now time.Time) (*apps.StatefulSetStatus, error)
        // ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned
        // error is nil, the returns slice of ControllerRevisions is valid.
        ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error)
@@ -83,7 +84,7 @@ type defaultStatefulSetControl struct {
 // strategy allows these constraints to be relaxed - pods will be created and deleted eagerly and
 // in no particular order. Clients using the burst strategy should be careful to ensure they
 // understand the consistency implications of having unpredictable numbers of pods available.
-func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
+func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, now time.Time) (*apps.StatefulSetStatus, error) {
        set = set.DeepCopy() // set is modified when a new revision is created in performUpdate. Make a copy now to avoid mutation errors.
 
        // list all revisions and sort them
@@ -93,7 +94,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set
        }
        history.SortControllerRevisions(revisions)
 
-       currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions)
+       currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions, now)
        if err != nil {
                errs := []error{err}
                if agg, ok := err.(utilerrors.Aggregate); ok {
@@ -107,7 +108,7 @@ func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set
 }
 
 func (ssc *defaultStatefulSetControl) performUpdate(
-       ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
+       ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision, now time.Time) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
        var currentStatus *apps.StatefulSetStatus
        logger := klog.FromContext(ctx)
        // get the current, and update revisions
@@ -117,7 +118,7 @@ func (ssc *defaultStatefulSetControl) performUpdate(
        }
 
        // perform the main update function and get the status
-       currentStatus, err = ssc.updateStatefulSet(ctx, set, currentRevision, updateRevision, collisionCount, pods)
+       currentStatus, err = ssc.updateStatefulSet(ctx, set, currentRevision, updateRevision, collisionCount, pods, now)
        if err != nil && currentStatus == nil {
                return currentRevision, updateRevision, nil, err
        }
@@ -367,7 +368,7 @@ type replicaStatus struct {
        updatedReplicas   int32
 }
 
-func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision) replicaStatus {
+func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, now time.Time) replicaStatus {
        status := replicaStatus{}
        for _, pod := range pods {
                if isCreated(pod) {
@@ -378,7 +379,7 @@ func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision
                if isRunningAndReady(pod) {
                        status.readyReplicas++
                        // count the number of running and available replicas
-                       if isRunningAndAvailable(pod, minReadySeconds) {
+                       if isRunningAndAvailable(pod, minReadySeconds, now) {
                                status.availableReplicas++
                        }
 
@@ -398,14 +399,14 @@ func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision
        return status
 }
 
-func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, podLists ...[]*v1.Pod) {
+func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, now time.Time, podLists ...[]*v1.Pod) {
        status.Replicas = 0
        status.ReadyReplicas = 0
        status.AvailableReplicas = 0
        status.CurrentReplicas = 0
        status.UpdatedReplicas = 0
        for _, list := range podLists {
-               replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision)
+               replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision, now)
                status.Replicas += replicaStatus.replicas
                status.ReadyReplicas += replicaStatus.readyReplicas
                status.AvailableReplicas += replicaStatus.availableReplicas
@@ -414,13 +415,7 @@ func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, current
        }
 }
 
-func (ssc *defaultStatefulSetControl) processReplica(
-       ctx context.Context,
-       set *apps.StatefulSet,
-       updateSet *apps.StatefulSet,
-       monotonic bool,
-       replicas []*v1.Pod,
-       i int) (bool, error) {
+func (ssc *defaultStatefulSetControl) processReplica(ctx context.Context, set *apps.StatefulSet, updateSet *apps.StatefulSet, monotonic bool, replicas []*v1.Pod, i int, now time.Time) (bool, error) {
        logger := klog.FromContext(ctx)
 
        // Note that pods with phase Succeeded will also trigger this event. This is
@@ -484,7 +479,7 @@ func (ssc *defaultStatefulSetControl) processReplica(
        // If we have a Pod that has been created but is not available we can not make progress.
        // We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
        // ordinal, are Available.
-       if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
+       if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds, now) && monotonic {
                logger.V(4).Info("StatefulSet is waiting for Pod to be Available",
                        "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
                return true, nil
@@ -510,7 +505,7 @@ func (ssc *defaultStatefulSetControl) processReplica(
        return false, nil
 }
 
-func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int) (bool, error) {
+func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int, now time.Time) (bool, error) {
        logger := klog.FromContext(ctx)
        if isTerminating(condemned[i]) {
                // if we are in monotonic mode, block and wait for terminating pods to expire
@@ -528,7 +523,7 @@ func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set
                return true, nil
        }
        // if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block.
-       if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds) && monotonic && condemned[i] != firstUnhealthyPod {
+       if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds, now) && monotonic && condemned[i] != firstUnhealthyPod {
                logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down",
                        "statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
                return true, nil
@@ -563,13 +558,7 @@ func runForAll(pods []*v1.Pod, fn func(i int) (bool, error), monotonic bool) (bo
 // all Pods with ordinal less than UpdateStrategy.Partition.Ordinal must be at Status.CurrentRevision and all other
 // Pods must be at Status.UpdateRevision. If the returned error is nil, the returned StatefulSetStatus is valid and the
 // update must be recorded. If the error is not nil, the method should be retried until successful.
-func (ssc *defaultStatefulSetControl) updateStatefulSet(
-       ctx context.Context,
-       set *apps.StatefulSet,
-       currentRevision *apps.ControllerRevision,
-       updateRevision *apps.ControllerRevision,
-       collisionCount int32,
-       pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
+func (ssc *defaultStatefulSetControl) updateStatefulSet(ctx context.Context, set *apps.StatefulSet, currentRevision *apps.ControllerRevision, updateRevision *apps.ControllerRevision, collisionCount int32, pods []*v1.Pod, now time.Time) (*apps.StatefulSetStatus, error) {
        logger := klog.FromContext(ctx)
        // get the current and update revisions of the set.
        currentSet, err := ApplyRevision(set, currentRevision)
@@ -589,7 +578,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
        status.CollisionCount = new(int32)
        *status.CollisionCount = collisionCount
 
-       updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, pods)
+       updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, pods)
 
        replicaCount := int(*set.Spec.Replicas)
        // slice that will contain all Pods such that getStartOrdinal(set) <= getOrdinal(pod) <= getEndOrdinal(set)
@@ -630,7 +619,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
 
        // find the first unhealthy Pod
        for i := range replicas {
-               if isUnavailable(replicas[i], set.Spec.MinReadySeconds) {
+               if isUnavailable(replicas[i], set.Spec.MinReadySeconds, now) {
                        unavailable++
                        if firstUnavailablePod == nil {
                                firstUnavailablePod = replicas[i]
@@ -640,7 +629,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
 
        // or the first unhealthy condemned Pod (condemned are sorted in descending order for ease of use)
        for i := len(condemned) - 1; i >= 0; i-- {
-               if isUnavailable(condemned[i], set.Spec.MinReadySeconds) {
+               if isUnavailable(condemned[i], set.Spec.MinReadySeconds, now) {
                        unavailable++
                        if firstUnavailablePod == nil {
                                firstUnavailablePod = condemned[i]
@@ -662,10 +651,10 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
 
        // First, process each living replica. Exit if we run into an error or something blocking in monotonic mode.
        processReplicaFn := func(i int) (bool, error) {
-               return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i)
+               return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i, now)
        }
        if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil {
-               updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
+               updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, replicas, condemned)
                return &status, err
        }
 
@@ -681,7 +670,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
                return false, nil
        }
        if shouldExit, err := runForAll(condemned, fixPodClaim, monotonic); shouldExit || err != nil {
-               updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
+               updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, replicas, condemned)
                return &status, err
        }
 
@@ -692,14 +681,14 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
        // Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
        // updates.
        processCondemnedFn := func(i int) (bool, error) {
-               return ssc.processCondemned(ctx, set, firstUnavailablePod, monotonic, condemned, i)
+               return ssc.processCondemned(ctx, set, firstUnavailablePod, monotonic, condemned, i, now)
        }
        if shouldExit, err := runForAll(condemned, processCondemnedFn, monotonic); shouldExit || err != nil {
-               updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
+               updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, replicas, condemned)
                return &status, err
        }
 
-       updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
+       updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, replicas, condemned)
 
        // for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
        if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
@@ -713,6 +702,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
                        replicas,
                        updateRevision,
                        status,
+                       now,
                )
        }
 
@@ -738,7 +728,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
                }
 
                // wait for unavailable Pods on update
-               if isUnavailable(replicas[target], set.Spec.MinReadySeconds) {
+               if isUnavailable(replicas[target], set.Spec.MinReadySeconds, now) {
                        logger.V(4).Info("StatefulSet is waiting for Pod to update",
                                "statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
                        return &status, nil
@@ -748,14 +738,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
        return &status, nil
 }
 
-func updateStatefulSetAfterInvariantEstablished(
-       ctx context.Context,
-       ssc *defaultStatefulSetControl,
-       set *apps.StatefulSet,
-       replicas []*v1.Pod,
-       updateRevision *apps.ControllerRevision,
-       status apps.StatefulSetStatus,
-) (*apps.StatefulSetStatus, error) {
+func updateStatefulSetAfterInvariantEstablished(ctx context.Context, ssc *defaultStatefulSetControl, set *apps.StatefulSet, replicas []*v1.Pod, updateRevision *apps.ControllerRevision, status apps.StatefulSetStatus, now time.Time) (*apps.StatefulSetStatus, error) {
 
        logger := klog.FromContext(ctx)
        replicaCount := int(*set.Spec.Replicas)
@@ -783,7 +766,7 @@ func updateStatefulSetAfterInvariantEstablished(
        unavailablePods := 0
 
        for target := len(replicas) - 1; target >= 0; target-- {
-               if isUnavailable(replicas[target], set.Spec.MinReadySeconds) {
+               if isUnavailable(replicas[target], set.Spec.MinReadySeconds, now) {
                        unavailablePods++
                }
        }
index 7fd7d529879bbbf3c7ab7ad94bfb57e30794ed74..734572e86d4c3408581cd2be30dbfc26b4b34c72 100644 (file)
@@ -359,7 +359,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc)
                if err != nil {
                        t.Error(err)
                }
-               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                        t.Errorf("Failed to update StatefulSet : %s", err)
                }
                set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -369,7 +369,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc)
                if pods, err = om.setPodRunning(set, i); err != nil {
                        t.Error(err)
                }
-               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                        t.Errorf("Failed to update StatefulSet : %s", err)
                }
                set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -384,7 +384,7 @@ func ReplacesPods(t *testing.T, set *apps.StatefulSet, invariants invariantFunc)
        if err != nil {
                t.Error(err)
        }
-       if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+       if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                t.Errorf("Failed to update StatefulSet : %s", err)
        }
        set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -408,7 +408,7 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc,
        if err != nil {
                t.Error(err)
        }
-       if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+       if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                t.Errorf("Error updating StatefulSet %s", err)
        }
        if err := invariants(set, om); err != nil {
@@ -442,7 +442,7 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc,
                // Expect pod deletion failure
                om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
                expectedNumOfDeleteRequests++
-               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) {
+               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); !isOrHasInternalError(err) {
                        t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
                }
                if err := invariants(set, om); err != nil {
@@ -458,7 +458,7 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc,
 
        // Expect pod deletion
        expectedNumOfDeleteRequests++
-       if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+       if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                t.Errorf("Error updating StatefulSet %s", err)
        }
        if err := invariants(set, om); err != nil {
@@ -472,7 +472,7 @@ func recreatesPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc,
        }
 
        // Expect no additional delete calls and expect pod creation
-       if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+       if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                t.Errorf("Error updating StatefulSet %s", err)
        }
        if err := invariants(set, om); err != nil {
@@ -584,7 +584,7 @@ func UpdatePodFailure(t *testing.T, set *apps.StatefulSet, invariants invariantF
        om.podsIndexer.Update(pods[0])
 
        // now it should fail
-       if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) {
+       if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); !isOrHasInternalError(err) {
                t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
        }
 }
@@ -657,7 +657,7 @@ func NewRevisionDeletePodFailure(t *testing.T, set *apps.StatefulSet, invariants
 
        // delete fails
        om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
-       _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
+       _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
        if err == nil {
                t.Error("Expected err in update StatefulSet when deleting a pod")
        }
@@ -678,7 +678,7 @@ func NewRevisionDeletePodFailure(t *testing.T, set *apps.StatefulSet, invariants
 
        // delete works
        om.SetDeleteStatefulPodError(nil, 0)
-       status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
+       status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
        if err != nil {
                t.Fatalf("Unexpected err in update StatefulSet: %v", err)
        }
@@ -767,7 +767,7 @@ func RecreatesPVCForPendingPod(t *testing.T, set *apps.StatefulSet, invariants i
        if err != nil {
                t.Error(err)
        }
-       if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+       if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                t.Errorf("Error updating StatefulSet %s", err)
        }
        if err := invariants(set, om); err != nil {
@@ -782,7 +782,7 @@ func RecreatesPVCForPendingPod(t *testing.T, set *apps.StatefulSet, invariants i
        }
        pods[0].Status.Phase = v1.PodPending
        om.podsIndexer.Update(pods[0])
-       if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+       if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                t.Errorf("Error updating StatefulSet %s", err)
        }
        // invariants check if there any missing PVCs for the Pods
@@ -994,7 +994,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) {
                spc.setPodRunning(set, 4)
                originalPods, _ := spc.setPodReadyCondition(set, 4, true)
                sort.Sort(ascendingOrdinal(originalPods))
-               if _, err := ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
+               if _, err := ssc.UpdateStatefulSet(context.TODO(), set, originalPods, time.Now()); err != nil {
                        t.Fatal(err)
                }
                pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
@@ -1008,7 +1008,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) {
                }
 
                // create new pod 3
-               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                        t.Fatal(err)
                }
                pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
@@ -1037,7 +1037,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) {
                spc.setPodReadyCondition(set, 4, true)
 
                // create new pod 4 (only one pod gets created at a time due to OrderedReady)
-               if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+               if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                        t.Fatal(err)
                }
                pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
@@ -1106,7 +1106,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) {
                sort.Sort(ascendingOrdinal(originalPods))
 
                // since maxUnavailable is 2, update pods 4 and 5, this will delete the pod 4 and 5,
-               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
+               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods, time.Now()); err != nil {
                        t.Fatal(err)
                }
                pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
@@ -1122,7 +1122,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) {
                }
 
                // create new pods
-               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                        t.Fatal(err)
                }
                pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
@@ -1138,7 +1138,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailable(t *testing.T) {
                spc.setPodReadyCondition(set, 5, true)
                originalPods, _ = spc.setPodReadyCondition(set, 3, true)
                sort.Sort(ascendingOrdinal(originalPods))
-               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
+               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods, time.Now()); err != nil {
                        t.Fatal(err)
                }
                pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
@@ -1234,7 +1234,7 @@ func TestStatefulSetControlRollingUpdateWithMaxUnavailableInOrderedModeVerifyInv
 
                        // try to update the statefulset
                        // this function is only called in main code when feature gate is enabled
-                       if _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, originalPods, updateRevision, status); err != nil {
+                       if _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, originalPods, updateRevision, status, time.Now()); err != nil {
                                t.Fatal(err)
                        }
                        pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
@@ -1969,7 +1969,7 @@ func TestStatefulSetControlLimitsHistory(t *testing.T) {
                        if err != nil {
                                t.Fatalf("%s: %s", test.name, err)
                        }
-                       _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
+                       _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
                        if err != nil {
                                t.Fatalf("%s: %s", test.name, err)
                        }
@@ -2336,7 +2336,7 @@ func TestStatefulSetAvailability(t *testing.T) {
                if err != nil {
                        t.Fatalf("%s: %s", test.name, err)
                }
-               status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
+               status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
                if err != nil {
                        t.Fatalf("%s: %s", test.name, err)
                }
@@ -2407,7 +2407,7 @@ func TestStatefulSetStatusUpdate(t *testing.T) {
                        if err != nil {
                                t.Error(err)
                        }
-                       _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
+                       _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
                        if ssu.updateStatusTracker.requests != 1 {
                                t.Errorf("Did not update status")
                        }
@@ -3077,7 +3077,7 @@ func parallelScaleUpStatefulSetControl(set *apps.StatefulSet,
                }
 
                // run the controller once and check invariants
-               _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
+               _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
                if err != nil {
                        return err
                }
@@ -3113,14 +3113,14 @@ func parallelScaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetC
                        return err
                }
                sort.Sort(ascendingOrdinal(pods))
-               if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+               if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                        return err
                }
                set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
                if err != nil {
                        return err
                }
-               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                        return err
                }
        }
@@ -3179,7 +3179,7 @@ func scaleUpStatefulSetControl(set *apps.StatefulSet,
                        }
                }
                // run the controller once and check invariants
-               _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
+               _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
                if err != nil {
                        return err
                }
@@ -3207,7 +3207,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
                }
                sort.Sort(ascendingOrdinal(pods))
                if idx := len(pods) - 1; idx >= 0 {
-                       if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+                       if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                                return err
                        }
                        set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -3217,7 +3217,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
                        if pods, err = om.addTerminatingPod(set, getOrdinal(pods[idx])); err != nil {
                                return err
                        }
-                       if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+                       if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                                return err
                        }
                        set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -3234,7 +3234,7 @@ func scaleDownStatefulSetControl(set *apps.StatefulSet, ssc StatefulSetControlIn
                                om.podsIndexer.Delete(pods[len(pods)-1])
                        }
                }
-               if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+               if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                        return err
                }
                set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -3327,7 +3327,7 @@ func updateStatefulSetControl(set *apps.StatefulSet,
        if err != nil {
                return err
        }
-       if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+       if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                return err
        }
 
@@ -3375,7 +3375,7 @@ func updateStatefulSetControl(set *apps.StatefulSet,
                        }
                }
 
-               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+               if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
                        return err
                }
                set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
@@ -3453,7 +3453,7 @@ func TestStatefulSetRollingUpdateRespectsMinReadySeconds(t *testing.T) {
        }
 
        // Perform update - should be blocked because pods haven't been ready for 30 seconds
-       status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
+       status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
        if err != nil {
                t.Fatalf("failed to update StatefulSet: %s", err)
        }
@@ -3483,7 +3483,7 @@ func TestStatefulSetRollingUpdateRespectsMinReadySeconds(t *testing.T) {
        }
 
        // Perform update again - now should proceed
-       status, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
+       status, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
        if err != nil {
                t.Fatalf("failed to update StatefulSet: %s", err)
        }
@@ -3537,7 +3537,7 @@ func TestStatefulSetScaleDownRespectsMinReadySeconds(t *testing.T) {
        }
 
        // Scale down should be blocked because pods haven't been available for 30 seconds
-       status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
+       status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
        if err != nil {
                t.Fatalf("failed to update StatefulSet: %s", err)
        }
@@ -3600,7 +3600,7 @@ func TestStatefulSetOnDeleteStrategyIgnoresMinReadySeconds(t *testing.T) {
        }
 
        // OnDelete strategy should complete regardless of MinReadySeconds
-       status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
+       status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
        if err != nil {
                t.Fatalf("failed to update StatefulSet: %s", err)
        }
@@ -3645,7 +3645,7 @@ func TestStatefulSetZeroMinReadySeconds(t *testing.T) {
        }
 
        // With zero MinReadySeconds, pods should be immediately available
-       status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
+       status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
        if err != nil {
                t.Fatalf("failed to update StatefulSet: %s", err)
        }
@@ -3695,7 +3695,7 @@ func TestStatefulSetPartitionRollingUpdateWithMinReadySeconds(t *testing.T) {
        }
 
        // Rolling update should be blocked because pods haven't been available for 30 seconds
-       status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
+       status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
        if err != nil {
                t.Fatalf("failed to update StatefulSet: %s", err)
        }
@@ -3798,7 +3798,7 @@ func TestStatefulSetMetrics(t *testing.T) {
                        ReadyReplicas: readyReplicas,
                }
                updateRevision := &apps.ControllerRevision{}
-               _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, pods, updateRevision, status)
+               _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, pods, updateRevision, status, time.Now())
                if err != nil {
                        t.Fatal(err)
                }
index 78a8cc95f75e1994794abae823a8d6e592fd5c30..cad55da0445ad7db5cc5c33416d210499cc0ba5b 100644 (file)
@@ -23,6 +23,9 @@ import (
        "fmt"
        "sort"
        "testing"
+       "time"
+
+       "github.com/onsi/gomega"
 
        apps "k8s.io/api/apps/v1"
        v1 "k8s.io/api/core/v1"
@@ -37,9 +40,9 @@ import (
        core "k8s.io/client-go/testing"
        "k8s.io/client-go/tools/cache"
        "k8s.io/klog/v2"
-       "k8s.io/klog/v2/ktesting"
        "k8s.io/kubernetes/pkg/controller"
        "k8s.io/kubernetes/pkg/controller/history"
+       "k8s.io/kubernetes/test/utils/ktesting"
 )
 
 var parentKind = apps.SchemeGroupVersion.WithKind("StatefulSet")
@@ -907,6 +910,66 @@ func TestStaleOwnerRefOnScaleup(t *testing.T) {
        }
 }
 
+func TestStatefulSetAvailabilityCheck(t *testing.T) {
+       _, ctx := ktesting.NewTestContext(t)
+
+       set := setMinReadySeconds(newStatefulSet(4), int32(5)) // 5 seconds
+       set = setupPodManagementPolicy(apps.ParallelPodManagement, set)
+       ssc, _, om, _ := newFakeStatefulSetController(ctx, set)
+       if err := om.setsIndexer.Add(set); err != nil {
+               t.Fatalf("could not add set to the cache: %v", err)
+       }
+       now := time.Now()
+
+       pods := []*v1.Pod{}
+       pods = append(pods, newStatefulSetPod(set, 0))
+       pods = append(pods, newStatefulSetPod(set, 1))
+       pods[1].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: now}}}
+       pods = append(pods, newStatefulSetPod(set, 2))
+       pods[2].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: now.Add(-2 * time.Second)}}}
+       pods = append(pods, newStatefulSetPod(set, 3))
+       pods[3].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: now.Add(-4300 * time.Millisecond)}}}
+
+       for i, pod := range pods {
+               if err := om.podsIndexer.Add(pod); err != nil {
+                       t.Fatalf("could not add pod to the cache %d: %v", i, err)
+               }
+               var err error
+               if pods, err = om.setPodRunning(set, i); err != nil {
+                       t.Fatalf("%d: %v", i, err)
+               }
+       }
+       err := ssc.syncStatefulSet(ctx, set, pods)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       if set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name); err != nil {
+               t.Fatalf("Could not get StatefulSet: %v", err)
+       }
+
+       // one pod is not ready
+       if set.Status.ReadyReplicas != 3 {
+               t.Errorf("Expected updated StatefulSet to contain ready replicas %v, got %v instead",
+                       3, set.Status.ReadyReplicas)
+       }
+       if set.Status.AvailableReplicas != 0 {
+               t.Errorf("Expected updated StatefulSet to contain available replicas %v, got %v instead",
+                       0, set.Status.AvailableReplicas)
+       }
+
+       if got, want := ssc.queue.Len(), 0; got != want {
+               t.Errorf("queue.Len() = %v, want %v", got, want)
+       }
+
+       // RS should be re-queued after 700ms to recompute .status.availableReplicas (200ms extra for the test).
+       ktesting.Eventually(ctx, func(tCtx ktesting.TContext) int {
+               return ssc.queue.Len()
+       }).WithTimeout(900*time.Millisecond).
+               WithPolling(10*time.Millisecond).
+               Should(gomega.Equal(1), " StatefulSet should be re-queued to recompute .status.availableReplicas")
+}
+
 func newFakeStatefulSetController(ctx context.Context, initialObjects ...runtime.Object) (*StatefulSetController, *StatefulPodControl, *fakeObjectManager, history.Interface) {
        client := fake.NewSimpleClientset(initialObjects...)
        informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
index 27465419bc0d9d2b0bbf0048205270a0ba184f20..b63aa62dfdabf1ec59daad5a152917fb1fc6f2cd 100644 (file)
@@ -21,6 +21,7 @@ import (
        "fmt"
        "regexp"
        "strconv"
+       "time"
 
        apps "k8s.io/api/apps/v1"
        v1 "k8s.io/api/core/v1"
@@ -457,8 +458,8 @@ func isRunningAndReady(pod *v1.Pod) bool {
        return pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod)
 }
 
-func isRunningAndAvailable(pod *v1.Pod, minReadySeconds int32) bool {
-       return podutil.IsPodAvailable(pod, minReadySeconds, metav1.Now())
+func isRunningAndAvailable(pod *v1.Pod, minReadySeconds int32, now time.Time) bool {
+       return podutil.IsPodAvailable(pod, minReadySeconds, metav1.Time{Time: now})
 }
 
 // isCreated returns true if pod has been created and is maintained by the API server
@@ -487,8 +488,8 @@ func isTerminating(pod *v1.Pod) bool {
 }
 
 // isUnavailable returns true if pod is not available or if it is terminating
-func isUnavailable(pod *v1.Pod, minReadySeconds int32) bool {
-       return !isRunningAndAvailable(pod, minReadySeconds) || isTerminating(pod)
+func isUnavailable(pod *v1.Pod, minReadySeconds int32, now time.Time) bool {
+       return !podutil.IsPodAvailable(pod, minReadySeconds, metav1.Time{Time: now}) || isTerminating(pod)
 }
 
 // allowsBurst is true if the alpha burst annotation is set.