"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
"k8s.io/kubernetes/pkg/controller/statefulset/metrics"
+ "k8s.io/utils/clock"
+ "k8s.io/utils/ptr"
"k8s.io/klog/v2"
)
queue workqueue.TypedRateLimitingInterface[string]
// eventBroadcaster is the core of event processing pipeline.
eventBroadcaster record.EventBroadcaster
+ clock clock.PassiveClock
}
// NewStatefulSetController creates a new statefulset controller.
podControl: controller.RealPodControl{KubeClient: kubeClient, Recorder: recorder},
eventBroadcaster: eventBroadcaster,
+ clock: clock.RealClock{},
}
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// having its status updated with the newly available replica.
if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && set.Spec.MinReadySeconds > 0 {
logger.V(2).Info("StatefulSet will be enqueued after minReadySeconds for availability check", "statefulSet", klog.KObj(set), "minReadySeconds", set.Spec.MinReadySeconds)
- // Add a second to avoid milliseconds skew in AddAfter.
- // See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
- ssc.enqueueSSAfter(logger, set, (time.Duration(set.Spec.MinReadySeconds)*time.Second)+time.Second)
+ // If there are multiple pods with varying readiness times, we cannot correctly track it
+ // with the current queue. Further resyncs are attempted at the end of the syncStatefulSet
+ // function.
+ ssc.enqueueSSAfter(logger, set, time.Duration(set.Spec.MinReadySeconds)*time.Second)
}
return
}
// sync syncs the given statefulset.
func (ssc *StatefulSetController) sync(ctx context.Context, key string) error {
- startTime := time.Now()
+ startTime := ssc.clock.Now()
logger := klog.FromContext(ctx)
defer func() {
logger.V(4).Info("Finished syncing statefulset", "key", key, "time", time.Since(startTime))
logger := klog.FromContext(ctx)
logger.V(4).Info("Syncing StatefulSet with pods", "statefulSet", klog.KObj(set), "pods", len(pods))
var status *apps.StatefulSetStatus
+ var nextSyncDuration *time.Duration
var err error
- status, err = ssc.control.UpdateStatefulSet(ctx, set, pods)
+ // Use the same time for calculating status and nextSyncDuration.
+ now := ssc.clock.Now()
+ status, err = ssc.control.UpdateStatefulSet(ctx, set, pods, now)
if err != nil {
return err
}
logger.V(4).Info("Successfully synced StatefulSet", "statefulSet", klog.KObj(set))
- // One more sync to handle the clock skew. This is also helping in requeuing right after status update
- if set.Spec.MinReadySeconds > 0 && status != nil && status.AvailableReplicas != *set.Spec.Replicas {
- ssc.enqueueSSAfter(logger, set, time.Duration(set.Spec.MinReadySeconds)*time.Second)
+ // Plan the next availability check as a last line of defense against queue preemption (we have one queue key for checking availability of all the pods)
+ // or early sync (see https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info).
+ if set.Spec.MinReadySeconds > 0 && status != nil && status.ReadyReplicas != status.AvailableReplicas {
+ // Safeguard fallback to the .spec.minReadySeconds to ensure that we always end up with .status.availableReplicas updated.
+ nextSyncDuration = ptr.To(time.Duration(set.Spec.MinReadySeconds) * time.Second)
+ // Use the same point in time (now) for calculating status and nextSyncDuration to get matching availability for the pods.
+ if nextCheck := controller.FindMinNextPodAvailabilityCheck(pods, set.Spec.MinReadySeconds, now, ssc.clock); nextCheck != nil {
+ nextSyncDuration = nextCheck
+ }
+ }
+ if nextSyncDuration != nil {
+ ssc.enqueueSSAfter(logger, set, *nextSyncDuration)
}
-
return nil
}
"context"
"sort"
"sync"
+ "time"
"k8s.io/klog/v2"
"k8s.io/utils/lru"
// If an implementation returns a non-nil error, the invocation will be retried using a rate-limited strategy.
// Implementors should sink any errors that they do not wish to trigger a retry, and they may feel free to
// exit exceptionally at any point provided they wish the update to be re-run at a later point in time.
- UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error)
+ UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, now time.Time) (*apps.StatefulSetStatus, error)
// ListRevisions returns a array of the ControllerRevisions that represent the revisions of set. If the returned
// error is nil, the returns slice of ControllerRevisions is valid.
ListRevisions(set *apps.StatefulSet) ([]*apps.ControllerRevision, error)
// strategy allows these constraints to be relaxed - pods will be created and deleted eagerly and
// in no particular order. Clients using the burst strategy should be careful to ensure they
// understand the consistency implications of having unpredictable numbers of pods available.
-func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
+func (ssc *defaultStatefulSetControl) UpdateStatefulSet(ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, now time.Time) (*apps.StatefulSetStatus, error) {
set = set.DeepCopy() // set is modified when a new revision is created in performUpdate. Make a copy now to avoid mutation errors.
// list all revisions and sort them
}
history.SortControllerRevisions(revisions)
- currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions)
+ currentRevision, updateRevision, status, err := ssc.performUpdate(ctx, set, pods, revisions, now)
if err != nil {
errs := []error{err}
if agg, ok := err.(utilerrors.Aggregate); ok {
}
func (ssc *defaultStatefulSetControl) performUpdate(
- ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
+ ctx context.Context, set *apps.StatefulSet, pods []*v1.Pod, revisions []*apps.ControllerRevision, now time.Time) (*apps.ControllerRevision, *apps.ControllerRevision, *apps.StatefulSetStatus, error) {
var currentStatus *apps.StatefulSetStatus
logger := klog.FromContext(ctx)
// get the current, and update revisions
}
// perform the main update function and get the status
- currentStatus, err = ssc.updateStatefulSet(ctx, set, currentRevision, updateRevision, collisionCount, pods)
+ currentStatus, err = ssc.updateStatefulSet(ctx, set, currentRevision, updateRevision, collisionCount, pods, now)
if err != nil && currentStatus == nil {
return currentRevision, updateRevision, nil, err
}
updatedReplicas int32
}
-func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision) replicaStatus {
+func computeReplicaStatus(pods []*v1.Pod, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, now time.Time) replicaStatus {
status := replicaStatus{}
for _, pod := range pods {
if isCreated(pod) {
if isRunningAndReady(pod) {
status.readyReplicas++
// count the number of running and available replicas
- if isRunningAndAvailable(pod, minReadySeconds) {
+ if isRunningAndAvailable(pod, minReadySeconds, now) {
status.availableReplicas++
}
return status
}
-func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, podLists ...[]*v1.Pod) {
+func updateStatus(status *apps.StatefulSetStatus, minReadySeconds int32, currentRevision, updateRevision *apps.ControllerRevision, now time.Time, podLists ...[]*v1.Pod) {
status.Replicas = 0
status.ReadyReplicas = 0
status.AvailableReplicas = 0
status.CurrentReplicas = 0
status.UpdatedReplicas = 0
for _, list := range podLists {
- replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision)
+ replicaStatus := computeReplicaStatus(list, minReadySeconds, currentRevision, updateRevision, now)
status.Replicas += replicaStatus.replicas
status.ReadyReplicas += replicaStatus.readyReplicas
status.AvailableReplicas += replicaStatus.availableReplicas
}
}
-func (ssc *defaultStatefulSetControl) processReplica(
- ctx context.Context,
- set *apps.StatefulSet,
- updateSet *apps.StatefulSet,
- monotonic bool,
- replicas []*v1.Pod,
- i int) (bool, error) {
+func (ssc *defaultStatefulSetControl) processReplica(ctx context.Context, set *apps.StatefulSet, updateSet *apps.StatefulSet, monotonic bool, replicas []*v1.Pod, i int, now time.Time) (bool, error) {
logger := klog.FromContext(ctx)
// Note that pods with phase Succeeded will also trigger this event. This is
// If we have a Pod that has been created but is not available we can not make progress.
// We must ensure that all for each Pod, when we create it, all of its predecessors, with respect to its
// ordinal, are Available.
- if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds) && monotonic {
+ if !isRunningAndAvailable(replicas[i], set.Spec.MinReadySeconds, now) && monotonic {
logger.V(4).Info("StatefulSet is waiting for Pod to be Available",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[i]))
return true, nil
return false, nil
}
-func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int) (bool, error) {
+func (ssc *defaultStatefulSetControl) processCondemned(ctx context.Context, set *apps.StatefulSet, firstUnhealthyPod *v1.Pod, monotonic bool, condemned []*v1.Pod, i int, now time.Time) (bool, error) {
logger := klog.FromContext(ctx)
if isTerminating(condemned[i]) {
// if we are in monotonic mode, block and wait for terminating pods to expire
return true, nil
}
// if we are in monotonic mode and the condemned target is not the first unhealthy Pod, block.
- if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds) && monotonic && condemned[i] != firstUnhealthyPod {
+ if !isRunningAndAvailable(condemned[i], set.Spec.MinReadySeconds, now) && monotonic && condemned[i] != firstUnhealthyPod {
logger.V(4).Info("StatefulSet is waiting for Pod to be Available prior to scale down",
"statefulSet", klog.KObj(set), "pod", klog.KObj(firstUnhealthyPod))
return true, nil
// all Pods with ordinal less than UpdateStrategy.Partition.Ordinal must be at Status.CurrentRevision and all other
// Pods must be at Status.UpdateRevision. If the returned error is nil, the returned StatefulSetStatus is valid and the
// update must be recorded. If the error is not nil, the method should be retried until successful.
-func (ssc *defaultStatefulSetControl) updateStatefulSet(
- ctx context.Context,
- set *apps.StatefulSet,
- currentRevision *apps.ControllerRevision,
- updateRevision *apps.ControllerRevision,
- collisionCount int32,
- pods []*v1.Pod) (*apps.StatefulSetStatus, error) {
+func (ssc *defaultStatefulSetControl) updateStatefulSet(ctx context.Context, set *apps.StatefulSet, currentRevision *apps.ControllerRevision, updateRevision *apps.ControllerRevision, collisionCount int32, pods []*v1.Pod, now time.Time) (*apps.StatefulSetStatus, error) {
logger := klog.FromContext(ctx)
// get the current and update revisions of the set.
currentSet, err := ApplyRevision(set, currentRevision)
status.CollisionCount = new(int32)
*status.CollisionCount = collisionCount
- updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, pods)
+ updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, pods)
replicaCount := int(*set.Spec.Replicas)
// slice that will contain all Pods such that getStartOrdinal(set) <= getOrdinal(pod) <= getEndOrdinal(set)
// find the first unhealthy Pod
for i := range replicas {
- if isUnavailable(replicas[i], set.Spec.MinReadySeconds) {
+ if isUnavailable(replicas[i], set.Spec.MinReadySeconds, now) {
unavailable++
if firstUnavailablePod == nil {
firstUnavailablePod = replicas[i]
// or the first unhealthy condemned Pod (condemned are sorted in descending order for ease of use)
for i := len(condemned) - 1; i >= 0; i-- {
- if isUnavailable(condemned[i], set.Spec.MinReadySeconds) {
+ if isUnavailable(condemned[i], set.Spec.MinReadySeconds, now) {
unavailable++
if firstUnavailablePod == nil {
firstUnavailablePod = condemned[i]
// First, process each living replica. Exit if we run into an error or something blocking in monotonic mode.
processReplicaFn := func(i int) (bool, error) {
- return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i)
+ return ssc.processReplica(ctx, set, updateSet, monotonic, replicas, i, now)
}
if shouldExit, err := runForAll(replicas, processReplicaFn, monotonic); shouldExit || err != nil {
- updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
+ updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, replicas, condemned)
return &status, err
}
return false, nil
}
if shouldExit, err := runForAll(condemned, fixPodClaim, monotonic); shouldExit || err != nil {
- updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
+ updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, replicas, condemned)
return &status, err
}
// Note that we do not resurrect Pods in this interval. Also note that scaling will take precedence over
// updates.
processCondemnedFn := func(i int) (bool, error) {
- return ssc.processCondemned(ctx, set, firstUnavailablePod, monotonic, condemned, i)
+ return ssc.processCondemned(ctx, set, firstUnavailablePod, monotonic, condemned, i, now)
}
if shouldExit, err := runForAll(condemned, processCondemnedFn, monotonic); shouldExit || err != nil {
- updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
+ updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, replicas, condemned)
return &status, err
}
- updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, replicas, condemned)
+ updateStatus(&status, set.Spec.MinReadySeconds, currentRevision, updateRevision, now, replicas, condemned)
// for the OnDelete strategy we short circuit. Pods will be updated when they are manually deleted.
if set.Spec.UpdateStrategy.Type == apps.OnDeleteStatefulSetStrategyType {
replicas,
updateRevision,
status,
+ now,
)
}
}
// wait for unavailable Pods on update
- if isUnavailable(replicas[target], set.Spec.MinReadySeconds) {
+ if isUnavailable(replicas[target], set.Spec.MinReadySeconds, now) {
logger.V(4).Info("StatefulSet is waiting for Pod to update",
"statefulSet", klog.KObj(set), "pod", klog.KObj(replicas[target]))
return &status, nil
return &status, nil
}
-func updateStatefulSetAfterInvariantEstablished(
- ctx context.Context,
- ssc *defaultStatefulSetControl,
- set *apps.StatefulSet,
- replicas []*v1.Pod,
- updateRevision *apps.ControllerRevision,
- status apps.StatefulSetStatus,
-) (*apps.StatefulSetStatus, error) {
+func updateStatefulSetAfterInvariantEstablished(ctx context.Context, ssc *defaultStatefulSetControl, set *apps.StatefulSet, replicas []*v1.Pod, updateRevision *apps.ControllerRevision, status apps.StatefulSetStatus, now time.Time) (*apps.StatefulSetStatus, error) {
logger := klog.FromContext(ctx)
replicaCount := int(*set.Spec.Replicas)
unavailablePods := 0
for target := len(replicas) - 1; target >= 0; target-- {
- if isUnavailable(replicas[target], set.Spec.MinReadySeconds) {
+ if isUnavailable(replicas[target], set.Spec.MinReadySeconds, now) {
unavailablePods++
}
}
if err != nil {
t.Error(err)
}
- if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Errorf("Failed to update StatefulSet : %s", err)
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if pods, err = om.setPodRunning(set, i); err != nil {
t.Error(err)
}
- if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Errorf("Failed to update StatefulSet : %s", err)
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Error(err)
}
- if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Errorf("Failed to update StatefulSet : %s", err)
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
t.Error(err)
}
- if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
// Expect pod deletion failure
om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
expectedNumOfDeleteRequests++
- if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) {
+ if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); !isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
}
if err := invariants(set, om); err != nil {
// Expect pod deletion
expectedNumOfDeleteRequests++
- if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
}
// Expect no additional delete calls and expect pod creation
- if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
om.podsIndexer.Update(pods[0])
// now it should fail
- if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); !isOrHasInternalError(err) {
+ if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); !isOrHasInternalError(err) {
t.Errorf("StatefulSetControl did not return InternalError, found %s", err)
}
}
// delete fails
om.SetDeleteStatefulPodError(apierrors.NewInternalError(errors.New("API server failed")), 0)
- _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
+ _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err == nil {
t.Error("Expected err in update StatefulSet when deleting a pod")
}
// delete works
om.SetDeleteStatefulPodError(nil, 0)
- status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
+ status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("Unexpected err in update StatefulSet: %v", err)
}
if err != nil {
t.Error(err)
}
- if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
}
pods[0].Status.Phase = v1.PodPending
om.podsIndexer.Update(pods[0])
- if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
// invariants check if there any missing PVCs for the Pods
spc.setPodRunning(set, 4)
originalPods, _ := spc.setPodReadyCondition(set, 4, true)
sort.Sort(ascendingOrdinal(originalPods))
- if _, err := ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
+ if _, err := ssc.UpdateStatefulSet(context.TODO(), set, originalPods, time.Now()); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
}
// create new pod 3
- if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
spc.setPodReadyCondition(set, 4, true)
// create new pod 4 (only one pod gets created at a time due to OrderedReady)
- if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
sort.Sort(ascendingOrdinal(originalPods))
// since maxUnavailable is 2, update pods 4 and 5, this will delete the pod 4 and 5,
- if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
+ if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods, time.Now()); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
}
// create new pods
- if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
spc.setPodReadyCondition(set, 5, true)
originalPods, _ = spc.setPodReadyCondition(set, 3, true)
sort.Sort(ascendingOrdinal(originalPods))
- if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods); err != nil {
+ if _, err = ssc.UpdateStatefulSet(context.TODO(), set, originalPods, time.Now()); err != nil {
t.Fatal(err)
}
pods, err = spc.podsLister.Pods(set.Namespace).List(selector)
// try to update the statefulset
// this function is only called in main code when feature gate is enabled
- if _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, originalPods, updateRevision, status); err != nil {
+ if _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, originalPods, updateRevision, status, time.Now()); err != nil {
t.Fatal(err)
}
pods, err := spc.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Fatalf("%s: %s", test.name, err)
}
- _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
+ _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("%s: %s", test.name, err)
}
if err != nil {
t.Fatalf("%s: %s", test.name, err)
}
- status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
+ status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("%s: %s", test.name, err)
}
if err != nil {
t.Error(err)
}
- _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
+ _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if ssu.updateStatusTracker.requests != 1 {
t.Errorf("Did not update status")
}
}
// run the controller once and check invariants
- _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
+ _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
return err
}
return err
}
sort.Sort(ascendingOrdinal(pods))
- if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
return err
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
return err
}
- if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
return err
}
}
}
}
// run the controller once and check invariants
- _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
+ _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
return err
}
}
sort.Sort(ascendingOrdinal(pods))
if idx := len(pods) - 1; idx >= 0 {
- if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
return err
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if pods, err = om.addTerminatingPod(set, getOrdinal(pods[idx])); err != nil {
return err
}
- if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
return err
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
om.podsIndexer.Delete(pods[len(pods)-1])
}
}
- if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
return err
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
if err != nil {
return err
}
- if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
return err
}
}
}
- if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods); err != nil {
+ if _, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now()); err != nil {
return err
}
set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name)
}
// Perform update - should be blocked because pods haven't been ready for 30 seconds
- status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
+ status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("failed to update StatefulSet: %s", err)
}
}
// Perform update again - now should proceed
- status, err = ssc.UpdateStatefulSet(context.TODO(), set, pods)
+ status, err = ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("failed to update StatefulSet: %s", err)
}
}
// Scale down should be blocked because pods haven't been available for 30 seconds
- status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
+ status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("failed to update StatefulSet: %s", err)
}
}
// OnDelete strategy should complete regardless of MinReadySeconds
- status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
+ status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("failed to update StatefulSet: %s", err)
}
}
// With zero MinReadySeconds, pods should be immediately available
- status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
+ status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("failed to update StatefulSet: %s", err)
}
}
// Rolling update should be blocked because pods haven't been available for 30 seconds
- status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods)
+ status, err := ssc.UpdateStatefulSet(context.TODO(), set, pods, time.Now())
if err != nil {
t.Fatalf("failed to update StatefulSet: %s", err)
}
ReadyReplicas: readyReplicas,
}
updateRevision := &apps.ControllerRevision{}
- _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, pods, updateRevision, status)
+ _, err = updateStatefulSetAfterInvariantEstablished(context.TODO(), ssc.(*defaultStatefulSetControl), set, pods, updateRevision, status, time.Now())
if err != nil {
t.Fatal(err)
}
"fmt"
"sort"
"testing"
+ "time"
+
+ "github.com/onsi/gomega"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
- "k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/history"
+ "k8s.io/kubernetes/test/utils/ktesting"
)
var parentKind = apps.SchemeGroupVersion.WithKind("StatefulSet")
}
}
+func TestStatefulSetAvailabilityCheck(t *testing.T) {
+ _, ctx := ktesting.NewTestContext(t)
+
+ set := setMinReadySeconds(newStatefulSet(4), int32(5)) // 5 seconds
+ set = setupPodManagementPolicy(apps.ParallelPodManagement, set)
+ ssc, _, om, _ := newFakeStatefulSetController(ctx, set)
+ if err := om.setsIndexer.Add(set); err != nil {
+ t.Fatalf("could not add set to the cache: %v", err)
+ }
+ now := time.Now()
+
+ pods := []*v1.Pod{}
+ pods = append(pods, newStatefulSetPod(set, 0))
+ pods = append(pods, newStatefulSetPod(set, 1))
+ pods[1].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: now}}}
+ pods = append(pods, newStatefulSetPod(set, 2))
+ pods[2].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: now.Add(-2 * time.Second)}}}
+ pods = append(pods, newStatefulSetPod(set, 3))
+ pods[3].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: now.Add(-4300 * time.Millisecond)}}}
+
+ for i, pod := range pods {
+ if err := om.podsIndexer.Add(pod); err != nil {
+ t.Fatalf("could not add pod to the cache %d: %v", i, err)
+ }
+ var err error
+ if pods, err = om.setPodRunning(set, i); err != nil {
+ t.Fatalf("%d: %v", i, err)
+ }
+ }
+ err := ssc.syncStatefulSet(ctx, set, pods)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if set, err = om.setsLister.StatefulSets(set.Namespace).Get(set.Name); err != nil {
+ t.Fatalf("Could not get StatefulSet: %v", err)
+ }
+
+ // one pod is not ready
+ if set.Status.ReadyReplicas != 3 {
+ t.Errorf("Expected updated StatefulSet to contain ready replicas %v, got %v instead",
+ 3, set.Status.ReadyReplicas)
+ }
+ if set.Status.AvailableReplicas != 0 {
+ t.Errorf("Expected updated StatefulSet to contain available replicas %v, got %v instead",
+ 0, set.Status.AvailableReplicas)
+ }
+
+ if got, want := ssc.queue.Len(), 0; got != want {
+ t.Errorf("queue.Len() = %v, want %v", got, want)
+ }
+
+ // RS should be re-queued after 700ms to recompute .status.availableReplicas (200ms extra for the test).
+ ktesting.Eventually(ctx, func(tCtx ktesting.TContext) int {
+ return ssc.queue.Len()
+ }).WithTimeout(900*time.Millisecond).
+ WithPolling(10*time.Millisecond).
+ Should(gomega.Equal(1), " StatefulSet should be re-queued to recompute .status.availableReplicas")
+}
+
func newFakeStatefulSetController(ctx context.Context, initialObjects ...runtime.Object) (*StatefulSetController, *StatefulPodControl, *fakeObjectManager, history.Interface) {
client := fake.NewSimpleClientset(initialObjects...)
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
"fmt"
"regexp"
"strconv"
+ "time"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
return pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod)
}
-func isRunningAndAvailable(pod *v1.Pod, minReadySeconds int32) bool {
- return podutil.IsPodAvailable(pod, minReadySeconds, metav1.Now())
+func isRunningAndAvailable(pod *v1.Pod, minReadySeconds int32, now time.Time) bool {
+ return podutil.IsPodAvailable(pod, minReadySeconds, metav1.Time{Time: now})
}
// isCreated returns true if pod has been created and is maintained by the API server
}
// isUnavailable returns true if pod is not available or if it is terminating
-func isUnavailable(pod *v1.Pod, minReadySeconds int32) bool {
- return !isRunningAndAvailable(pod, minReadySeconds) || isTerminating(pod)
+func isUnavailable(pod *v1.Pod, minReadySeconds int32, now time.Time) bool {
+ return !podutil.IsPodAvailable(pod, minReadySeconds, metav1.Time{Time: now}) || isTerminating(pod)
}
// allowsBurst is true if the alpha burst annotation is set.