discoveryFunc := resourceQuotaControllerDiscoveryClient.ServerPreferredNamespacedResources
listerFuncForResource := generic.ListerFuncForResourceFunc(controllerContext.InformerFactory.ForResource)
- quotaConfiguration := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource, controllerContext.InformerFactory)
+ quotaConfiguration, err := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource, controllerContext.InformerFactory)
+ if err != nil {
+ return nil, err
+ }
resourceQuotaControllerOptions := &resourcequotacontroller.ControllerOptions{
QuotaClient: resourceQuotaControllerClient.CoreV1(),
func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister quota.ListerForResourceFunc, discoveryFunc NamespacedResourcesFunc) quotaController {
informerFactory := informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
- quotaConfiguration := install.NewQuotaConfigurationForControllers(lister, informerFactory)
+ quotaConfiguration, err := install.NewQuotaConfigurationForControllers(lister, informerFactory)
+ if err != nil {
+ t.Fatal(err)
+ }
alwaysStarted := make(chan struct{})
close(alwaysStarted)
resourceQuotaControllerOptions := &ControllerOptions{
webhookAuthResolverWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, egressSelector, c.LoopbackClientConfig, tp)
webhookPluginInitializer := webhookinit.NewPluginInitializer(webhookAuthResolverWrapper, serviceResolver)
+ quotaConfiguration, err := quotainstall.NewQuotaConfigurationForAdmission(c.ExternalInformers)
+ if err != nil {
+ return nil, err
+ }
kubePluginInitializer := NewPluginInitializer(
- quotainstall.NewQuotaConfigurationForAdmission(c.ExternalInformers),
+ quotaConfiguration,
exclusion.Excluded(),
)
import (
"context"
+ "fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
}
// NewEvaluators returns the list of static evaluators that manage more than counts
-func NewEvaluators(f quota.ListerForResourceFunc, i informers.SharedInformerFactory) []quota.Evaluator {
+func NewEvaluators(f quota.ListerForResourceFunc, i informers.SharedInformerFactory) ([]quota.Evaluator, error) {
// these evaluators have special logic
result := []quota.Evaluator{
NewPodEvaluator(f, clock.RealClock{}),
NewServiceEvaluator(f),
NewPersistentVolumeClaimEvaluator(f),
}
+ var claimGetter resourceClaimPodOwnerGetter
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
var podLister corev1listers.PodLister
var deviceClassMapping *extendedresourcecache.ExtendedResourceCache
logger := klog.FromContext(context.Background())
deviceClassMapping = extendedresourcecache.NewExtendedResourceCache(logger)
if _, err := i.Resource().V1().DeviceClasses().Informer().AddEventHandler(deviceClassMapping); err != nil {
- logger.Error(err, "failed to add device class informer event handler")
+ return nil, fmt.Errorf("failed to add device class informer event handler: %w", err)
+ }
+ var err error
+ claimGetter, err = makeResourceClaimPodOwnerGetter(i.Resource().V1().ResourceClaims())
+ if err != nil {
+ return nil, err
}
}
- result = append(result, NewResourceClaimEvaluator(f, deviceClassMapping, podLister))
+ result = append(result, NewResourceClaimEvaluator(f, deviceClassMapping, podLister, claimGetter))
}
// these evaluators require an alias for backwards compatibility
result = append(result,
generic.NewObjectCountEvaluator(gvr.GroupResource(), generic.ListResourceUsingListerFunc(f, gvr), alias))
}
- return result
+ return result, nil
}
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/admission"
quota "k8s.io/apiserver/pkg/quota/v1"
"k8s.io/apiserver/pkg/quota/v1/generic"
utilfeature "k8s.io/apiserver/pkg/util/feature"
+ v1 "k8s.io/client-go/informers/resource/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
+ "k8s.io/client-go/tools/cache"
"k8s.io/dynamic-resource-allocation/deviceclass/extendedresourcecache"
resourceinternal "k8s.io/kubernetes/pkg/apis/resource"
resourceversioned "k8s.io/kubernetes/pkg/apis/resource/v1"
}
// NewResourceClaimEvaluator returns an evaluator that can evaluate resource claims
-func NewResourceClaimEvaluator(f quota.ListerForResourceFunc, m *extendedresourcecache.ExtendedResourceCache, podsGetter corev1listers.PodLister) quota.Evaluator {
+func NewResourceClaimEvaluator(f quota.ListerForResourceFunc, m *extendedresourcecache.ExtendedResourceCache, podsGetter corev1listers.PodLister, claimGetter resourceClaimPodOwnerGetter) quota.Evaluator {
listFuncByNamespace := generic.ListResourceUsingListerFunc(f, resourceapi.SchemeGroupVersion.WithResource("resourceclaims"))
- claimEvaluator := &claimEvaluator{listFuncByNamespace: listFuncByNamespace, deviceClassMapping: m, podsGetter: podsGetter}
+ claimEvaluator := &claimEvaluator{listFuncByNamespace: listFuncByNamespace, deviceClassMapping: m, podsGetter: podsGetter, claimGetter: claimGetter}
return claimEvaluator
}
deviceClassMapping *extendedresourcecache.ExtendedResourceCache
// podsGetter is used to get pods
podsGetter corev1listers.PodLister
+ // claimGetter is used to get claims for extended resources by namespace and pod owner uid
+ claimGetter resourceClaimPodOwnerGetter
}
// Constraints verifies that all required resources are present on the item.
if claim.Annotations[resourceapi.ExtendedResourceClaimAnnotation] != "true" {
return nil
}
- controllerRef := metav1.GetControllerOf(claim)
+ controllerRef := metav1.GetControllerOfNoCopy(claim)
if controllerRef == nil {
return nil
}
if pod.Status.ExtendedResourceClaimStatus != nil && pod.Status.ExtendedResourceClaimStatus.ResourceClaimName != claim.Name {
return nil
}
+ // if the pod doesn't identify its extended resource claim, make sure we're the first or only extended resource claim for this pod
+ if pod.Status.ExtendedResourceClaimStatus == nil {
+ ownedClaims, err := p.claimGetter(claim.Namespace, pod.UID)
+ if err != nil {
+ return nil
+ }
+ for _, ownedClaim := range ownedClaims {
+ switch ownedClaim.CreationTimestamp.Time.Compare(claim.CreationTimestamp.Time) {
+ case -1:
+ // There's another owned claim created earlier than this one.
+ // Don't exempt this one based on pod usage.
+ return nil
+ case 0:
+ if ownedClaim.Name < claim.Name {
+ // There's another owned claim created at the same time as this one with an earlier name.
+ // Don't exempt this one based on pod usage.
+ return nil
+ }
+ case 1:
+ continue
+ }
+ }
+ }
+
quotaReqs, err := PodUsageFunc(pod, clock.RealClock{})
if err != nil {
return nil
}
return claim, nil
}
+
+const extendedResourceClaimPodOwnerIndexName = "extendedResourceClaimPodUIDOwner"
+
+func resourceClaimPodOwnerKey(namespace string, podUID types.UID) string {
+ return namespace + "/" + string(podUID)
+}
+
+type resourceClaimPodOwnerGetter func(namespace string, podUID types.UID) ([]*resourceapi.ResourceClaim, error)
+
+func makeResourceClaimPodOwnerGetter(resourceClaimInformer v1.ResourceClaimInformer) (resourceClaimPodOwnerGetter, error) {
+ indexers := cache.Indexers{extendedResourceClaimPodOwnerIndexName: extendedResourceClaimPodUIDOwnerIndex}
+ if err := resourceClaimInformer.Informer().AddIndexers(indexers); err != nil {
+ _, exists := resourceClaimInformer.Informer().GetIndexer().GetIndexers()[extendedResourceClaimPodOwnerIndexName]
+ if !exists {
+ return nil, fmt.Errorf("failed to add resource claim pod owner indexer: %w", err)
+ }
+ }
+ indexer := resourceClaimInformer.Informer().GetIndexer()
+ return func(namespace string, podUID types.UID) ([]*resourceapi.ResourceClaim, error) {
+ objects, err := indexer.ByIndex(extendedResourceClaimPodOwnerIndexName, resourceClaimPodOwnerKey(namespace, podUID))
+ if err != nil {
+ return nil, err
+ }
+ claims := make([]*resourceapi.ResourceClaim, 0, len(objects))
+ for _, obj := range objects {
+ if claim, ok := obj.(*resourceapi.ResourceClaim); ok {
+ claims = append(claims, claim)
+ } else {
+ return nil, fmt.Errorf("failed to get resource claim from indexer")
+ }
+ }
+ return claims, nil
+ }, nil
+}
+func extendedResourceClaimPodUIDOwnerIndex(obj interface{}) ([]string, error) {
+ claim, ok := obj.(*resourceapi.ResourceClaim)
+ if !ok {
+ return nil, nil
+ }
+ if claim.Annotations[resourceapi.ExtendedResourceClaimAnnotation] != "true" {
+ return nil, nil
+ }
+ controllerRef := metav1.GetControllerOfNoCopy(claim)
+ if controllerRef == nil {
+ return nil, nil
+ }
+ if controllerRef.Kind != "Pod" || controllerRef.APIVersion != "v1" {
+ return nil, nil
+ }
+ return []string{resourceClaimPodOwnerKey(claim.Namespace, controllerRef.UID)}, nil
+}
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/admission"
quota "k8s.io/apiserver/pkg/quota/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
},
},
})
+ nilStatusExtendedResourceClaimExternal, err := toExternalResourceClaimOrError(nilStatusExtendedResourceClaim)
+ if err != nil {
+ t.Fatal(err)
+ }
+ nilStatusExtendedResourceClaim.Name = "foo-copy"
initExtendedResourceClaim := testResourceClaim("foo", "ns", true, "pod-init", api.ResourceClaimSpec{
Devices: api.DeviceClaim{
Requests: []api.DeviceRequest{
informerFactory := informers.NewSharedInformerFactory(client, 0)
deviceclassmapping := extendedresourcecache.NewExtendedResourceCache(logger)
if _, err := informerFactory.Resource().V1().DeviceClasses().Informer().AddEventHandler(deviceclassmapping); err != nil {
- logger.Error(err, "failed to add device class informer event handler")
+ t.Fatal(err)
+ }
+ var otherOwnedClaims []*resourceapi.ResourceClaim
+ claimGetter := func(namespace string, podUID types.UID) ([]*resourceapi.ResourceClaim, error) {
+ return otherOwnedClaims, nil
}
- evaluatorWithDeviceMapping := NewResourceClaimEvaluator(nil, deviceclassmapping, informerFactory.Core().V1().Pods().Lister())
+ evaluatorWithDeviceMapping := NewResourceClaimEvaluator(nil, deviceclassmapping, informerFactory.Core().V1().Pods().Lister(), claimGetter)
informerFactory.Start(tCtx.Done())
t.Cleanup(func() {
// wait for informer sync
time.Sleep(1 * time.Second)
- evaluator := NewResourceClaimEvaluator(nil, nil, nil)
+ evaluator := NewResourceClaimEvaluator(nil, nil, nil, claimGetter)
testCases := map[string]struct {
- evaluator quota.Evaluator
- claim *api.ResourceClaim
- usage corev1.ResourceList
- errMsg string
+ evaluator quota.Evaluator
+ claim *api.ResourceClaim
+ otherClaims []*resourceapi.ResourceClaim
+ usage corev1.ResourceList
+ errMsg string
}{
"implicit-extended-resource-claim": {
evaluator: evaluatorWithDeviceMapping,
"requests.example.com/gpu": resource.MustParse("1"),
},
},
- "ni-status-extended-resource-claim": {
+ "nil-status-extended-resource-claim": {
evaluator: evaluatorWithDeviceMapping,
claim: nilStatusExtendedResourceClaim,
usage: corev1.ResourceList{
"requests.example.com/gpu": resource.MustParse("1"),
},
},
+ // both claims set the same pod as owner, the second claim cannot get the usage
+ // subtraction from pod requests.
+ "nil-status-two-extended-resource-claims": {
+ evaluator: evaluatorWithDeviceMapping,
+ claim: nilStatusExtendedResourceClaim,
+ otherClaims: []*resourceapi.ResourceClaim{
+ nilStatusExtendedResourceClaimExternal,
+ },
+ usage: corev1.ResourceList{
+ "count/resourceclaims.resource.k8s.io": resource.MustParse("1"),
+ "gpu.deviceclass.resource.k8s.io/devices": resource.MustParse("2"),
+ "requests.deviceclass.resource.kubernetes.io/gpu": resource.MustParse("2"),
+ "requests.example.com/gpu": resource.MustParse("2"),
+ },
+ },
"init-extended-resource-claim": {
evaluator: evaluatorWithDeviceMapping,
claim: initExtendedResourceClaim,
}
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
+ otherOwnedClaims = testCase.otherClaims
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DRAExtendedResource, true)
if testCase.evaluator == nil {
testCase.evaluator = evaluator
if _, err := informerFactory.Resource().V1().DeviceClasses().Informer().AddEventHandler(deviceclassmapping); err != nil {
logger.Error(err, "failed to add device class informer event handler")
}
- evaluator := NewResourceClaimEvaluator(nil, deviceclassmapping, informerFactory.Core().V1().Pods().Lister())
+ evaluator := NewResourceClaimEvaluator(nil, deviceclassmapping, informerFactory.Core().V1().Pods().Lister(), nil)
informerFactory.Start(tCtx.Done())
t.Cleanup(func() {
}
func TestResourceClaimEvaluatorHandles(t *testing.T) {
- evaluator := NewResourceClaimEvaluator(nil, nil, nil)
+ evaluator := NewResourceClaimEvaluator(nil, nil, nil, nil)
testCases := []struct {
name string
attrs admission.Attributes
)
// NewQuotaConfigurationForAdmission returns a quota configuration for admission control.
-func NewQuotaConfigurationForAdmission(i informers.SharedInformerFactory) quota.Configuration {
- evaluators := core.NewEvaluators(nil, i)
- return generic.NewConfiguration(evaluators, DefaultIgnoredResources())
+func NewQuotaConfigurationForAdmission(i informers.SharedInformerFactory) (quota.Configuration, error) {
+ evaluators, err := core.NewEvaluators(nil, i)
+ if err != nil {
+ return nil, err
+ }
+ return generic.NewConfiguration(evaluators, DefaultIgnoredResources()), nil
}
// NewQuotaConfigurationForControllers returns a quota configuration for controllers.
-func NewQuotaConfigurationForControllers(f quota.ListerForResourceFunc, i informers.SharedInformerFactory) quota.Configuration {
- evaluators := core.NewEvaluators(f, i)
- return generic.NewConfiguration(evaluators, DefaultIgnoredResources())
+func NewQuotaConfigurationForControllers(f quota.ListerForResourceFunc, i informers.SharedInformerFactory) (quota.Configuration, error) {
+ evaluators, err := core.NewEvaluators(f, i)
+ if err != nil {
+ return nil, err
+ }
+ return generic.NewConfiguration(evaluators, DefaultIgnoredResources()), nil
}
// ignoredResources are ignored by quota by default
if config == nil {
config = &resourcequotaapi.Configuration{}
}
- quotaConfiguration := install.NewQuotaConfigurationForAdmission(nil)
+ quotaConfiguration, err := install.NewQuotaConfigurationForAdmission(nil)
+ if err != nil {
+ return nil, err
+ }
handler, err := resourcequota.NewResourceQuota(config, 5)
if err != nil {
discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources
listerFuncForResource := generic.ListerFuncForResourceFunc(informers.ForResource)
- qc := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource, nil)
+ qc, err := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource, nil)
+ if err != nil {
+ t.Fatalf("unexpected err: %v", err)
+ }
informersStarted := make(chan struct{})
resourceQuotaControllerOptions := &resourcequotacontroller.ControllerOptions{
QuotaClient: clientset.CoreV1(),
discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources
listerFuncForResource := generic.ListerFuncForResourceFunc(informers.ForResource)
- qc := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource, nil)
+ qc, err := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource, nil)
+ if err != nil {
+ t.Fatalf("unexpected err: %v", err)
+ }
informersStarted := make(chan struct{})
resourceQuotaControllerOptions := &resourcequotacontroller.ControllerOptions{
QuotaClient: clientset.CoreV1(),
discoveryFunc := clientset.Discovery().ServerPreferredNamespacedResources
listerFuncForResource := generic.ListerFuncForResourceFunc(informers.ForResource)
- qc := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource, nil)
+ qc, err := quotainstall.NewQuotaConfigurationForControllers(listerFuncForResource, nil)
+ if err != nil {
+ t.Fatalf("unexpected err: %v", err)
+ }
informersStarted := make(chan struct{})
resourceQuotaControllerOptions := &resourcequotacontroller.ControllerOptions{
QuotaClient: clientset.CoreV1(),