"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
+ apiruntime "k8s.io/apimachinery/pkg/runtime"
+ utilfeature "k8s.io/apiserver/pkg/util/feature"
+ featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2/ktesting"
fwk "k8s.io/kube-scheduler/framework"
+ "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/backend/cache"
"k8s.io/kubernetes/pkg/scheduler/framework"
args config.NodeResourcesBalancedAllocationArgs
runPreScore bool
wantPreScoreStatusCode fwk.Code
+ draObjects []apiruntime.Object
}{
{
// bestEffort pods, skip in PreScore
args: config.NodeResourcesBalancedAllocationArgs{Resources: defaultResourceBalancedAllocationSet},
runPreScore: false,
},
+ {
+ // Node1 scores on 0-MaxNodeScore scale
+ // CPU Fraction: 3000 / 3500 = 0.8571
+ // Memory Fraction: 0 / 40000 = 0
+ // DRA Fraction: 1 / 8 = 0.125
+ // Fraction mean: (0.8571 + 0 + 0.125) / 3 = 0.3274
+ // Node1 std: sqrt(((0.8571 - 0.3274)**2 + (0- 0.3274)**2 + (0.125 - 0.3274)**2) / 3) = 0.378
+ // Node1 Score: (1 - 0.378)*MaxNodeScore = 62
+ // Node2 scores on 0-MaxNodeScore scale
+ // CPU Fraction: 3000 / 3500 = 0.8571
+ // Memory Fraction: 5000 / 40000 = 0.125
+ // Node2 std: (0.8571 - 0.125) / 2 = 0.36605
+ // Node2 Score: (1 - 0.36605)*MaxNodeScore = 63
+ pod: st.MakePod().Req(map[v1.ResourceName]string{extendedResourceDRA: "1"}).Obj(),
+ nodes: []*v1.Node{makeNode("node1", 3500, 40000, nil), makeNode("node2", 3500, 40000, nil)},
+ expectedList: []fwk.NodeScore{{Name: "node1", Score: 62}, {Name: "node2", Score: 63}},
+ name: "include DRA resource on a node for balanced resource allocation",
+ pods: []*v1.Pod{
+ {Spec: cpuOnly},
+ {Spec: cpuAndMemory},
+ },
+ args: config.NodeResourcesBalancedAllocationArgs{Resources: []config.ResourceSpec{
+ {Name: string(v1.ResourceCPU), Weight: 1},
+ {Name: string(v1.ResourceMemory), Weight: 1},
+ {Name: extendedResourceName, Weight: 1},
+ }},
+ draObjects: []apiruntime.Object{
+ deviceClassWithExtendResourceName,
+ st.MakeResourceSlice("node1", "test-driver").Device("device-1").Device("device-2").Device("device-3").Device("device-4").Device("device-5").Device("device-6").Device("device-7").Device("device-8").Obj(),
+ },
+ runPreScore: true,
+ },
+ {
+ // Node1 scores on 0-MaxNodeScore scale
+ // CPU Fraction: 3000 / 35000 = 0.8571
+ // Memory Fraction: 0 / 40000 = 0
+ // DRA Fraction: 1 / 8 = 0.125
+ // Fraction mean: (0.8571 + 0 + 0.125) / 3 = 0.3274
+ // Node1 std: sqrt(((0.8571 - 0.3274)**2 + (0- 0.3274)**2 + (0.125 - 0.3274)**2) / 3) = 0.378
+ // Node1 Score: (1 - 0.378)*MaxNodeScore = 62
+ // Node2 scores on 0-MaxNodeScore scale
+ // CPU Fraction: 3000 / 35000 = 0.8571
+ // Memory Fraction: 5000 / 40000 = 0.125
+ // Node2 std: (0.8571 - 0.125) / 2 = 0.36605
+ // Node2 Score: (1 - 0.36605)*MaxNodeScore = 63
+ pod: st.MakePod().Req(map[v1.ResourceName]string{extendedResourceDRA: "1"}).Obj(),
+ nodes: []*v1.Node{makeNode("node1", 3500, 40000, nil), makeNode("node2", 3500, 40000, nil)},
+ expectedList: []fwk.NodeScore{{Name: "node1", Score: 62}, {Name: "node2", Score: 63}},
+ name: "include DRA resource on a node for balanced resource allocation if PreScore not called",
+ pods: []*v1.Pod{
+ {Spec: cpuOnly},
+ {Spec: cpuAndMemory},
+ },
+ args: config.NodeResourcesBalancedAllocationArgs{Resources: []config.ResourceSpec{
+ {Name: string(v1.ResourceCPU), Weight: 1},
+ {Name: string(v1.ResourceMemory), Weight: 1},
+ {Name: extendedResourceName, Weight: 1},
+ }},
+ draObjects: []apiruntime.Object{
+ deviceClassWithExtendResourceName,
+ st.MakeResourceSlice("node1", "test-driver").Device("device-1").Device("device-2").Device("device-3").Device("device-4").Device("device-5").Device("device-6").Device("device-7").Device("device-8").Obj(),
+ },
+ runPreScore: false,
+ },
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
+ featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DRAExtendedResource, test.draObjects != nil)
snapshot := cache.NewSnapshot(test.pods, test.nodes)
- _, ctx := ktesting.NewTestContext(t)
+ logger, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
fh, _ := runtime.NewFramework(ctx, nil, nil, runtime.WithSnapshotSharedLister(snapshot))
- p, _ := NewBalancedAllocation(ctx, &test.args, fh, feature.Features{})
+ p, _ := NewBalancedAllocation(ctx, &test.args, fh, feature.Features{
+ EnableDRAExtendedResource: test.draObjects != nil,
+ })
+
+ draManager, err := newTestDRAManager(t, ctx, logger, test.draObjects...)
+ if err != nil {
+ t.Fatalf("failed to create test DRA manager: %v", err)
+ }
+ p.(*BalancedAllocation).draManager = draManager
+
state := framework.NewCycleState()
if test.runPreScore {
status := p.(fwk.PreScorePlugin).PreScore(ctx, state, test.pod, tf.BuildNodeInfos(test.nodes))
})
}
}
+
+func TestBalancedAllocationSignPod(t *testing.T) {
+ tests := map[string]struct {
+ name string
+ pod *v1.Pod
+ enableDRAExtendedResource bool
+ expectedFragments []fwk.SignFragment
+ expectedStatusCode fwk.Code
+ }{
+ "pod with CPU and memory requests": {
+ pod: st.MakePod().Req(map[v1.ResourceName]string{
+ v1.ResourceCPU: "1000m",
+ v1.ResourceMemory: "2000",
+ }).Obj(),
+ enableDRAExtendedResource: false,
+ expectedFragments: []fwk.SignFragment{
+ {Key: fwk.ResourcesSignerName, Value: computePodResourceRequest(st.MakePod().Req(map[v1.ResourceName]string{
+ v1.ResourceCPU: "1000m",
+ v1.ResourceMemory: "2000",
+ }).Obj(), ResourceRequestsOptions{})},
+ },
+ expectedStatusCode: fwk.Success,
+ },
+ "best-effort pod with no requests": {
+ pod: st.MakePod().Obj(),
+ enableDRAExtendedResource: false,
+ expectedFragments: []fwk.SignFragment{
+ {Key: fwk.ResourcesSignerName, Value: computePodResourceRequest(st.MakePod().Obj(), ResourceRequestsOptions{})},
+ },
+ expectedStatusCode: fwk.Success,
+ },
+ "pod with multiple containers": {
+ pod: st.MakePod().Container("container1").Req(map[v1.ResourceName]string{
+ v1.ResourceCPU: "500m",
+ v1.ResourceMemory: "1000",
+ }).Container("container2").Req(map[v1.ResourceName]string{
+ v1.ResourceCPU: "1500m",
+ v1.ResourceMemory: "3000",
+ }).Obj(),
+ enableDRAExtendedResource: false,
+ expectedFragments: []fwk.SignFragment{
+ {Key: fwk.ResourcesSignerName, Value: computePodResourceRequest(st.MakePod().Container("container1").Req(map[v1.ResourceName]string{
+ v1.ResourceCPU: "500m",
+ v1.ResourceMemory: "1000",
+ }).Container("container2").Req(map[v1.ResourceName]string{
+ v1.ResourceCPU: "1500m",
+ v1.ResourceMemory: "3000",
+ }).Obj(), ResourceRequestsOptions{})},
+ },
+ expectedStatusCode: fwk.Success,
+ },
+ "DRA extended resource enabled - returns unschedulable": {
+ pod: st.MakePod().Req(map[v1.ResourceName]string{
+ v1.ResourceCPU: "1000m",
+ v1.ResourceMemory: "2000",
+ }).Obj(),
+ enableDRAExtendedResource: true,
+ expectedFragments: nil,
+ expectedStatusCode: fwk.Unschedulable,
+ },
+ }
+
+ for name, test := range tests {
+ t.Run(name, func(t *testing.T) {
+ _, ctx := ktesting.NewTestContext(t)
+
+ p, err := NewBalancedAllocation(ctx, &config.NodeResourcesBalancedAllocationArgs{}, nil, feature.Features{
+ EnableDRAExtendedResource: test.enableDRAExtendedResource,
+ })
+ if err != nil {
+ t.Fatalf("failed to create plugin: %v", err)
+ }
+
+ ba := p.(*BalancedAllocation)
+ fragments, status := ba.SignPod(ctx, test.pod)
+
+ if status.Code() != test.expectedStatusCode {
+ t.Errorf("unexpected status code, want: %v, got: %v, message: %v", test.expectedStatusCode, status.Code(), status.Message())
+ }
+
+ if test.expectedStatusCode == fwk.Success {
+ if diff := cmp.Diff(test.expectedFragments, fragments); diff != "" {
+ t.Errorf("unexpected fragments, diff (-want,+got):\n%s", diff)
+ }
+ }
+ })
+ }
+}
import (
"context"
+ "fmt"
"testing"
"github.com/google/go-cmp/cmp"
"k8s.io/dynamic-resource-allocation/deviceclass/extendedresourcecache"
"k8s.io/dynamic-resource-allocation/resourceslice/tracker"
"k8s.io/dynamic-resource-allocation/structured"
+ "k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
fwk "k8s.io/kube-scheduler/framework"
"k8s.io/kubernetes/pkg/features"
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DRAExtendedResource, tc.enableDRAExtendedResource)
- client := fake.NewClientset(tc.objects...)
- informerFactory := informers.NewSharedInformerFactory(client, 0)
- resourceSliceTrackerOpts := tracker.Options{
- SliceInformer: informerFactory.Resource().V1().ResourceSlices(),
- TaintInformer: informerFactory.Resource().V1alpha3().DeviceTaintRules(),
- ClassInformer: informerFactory.Resource().V1().DeviceClasses(),
- KubeClient: client,
- }
- resourceSliceTracker, err := tracker.StartTracker(tCtx, resourceSliceTrackerOpts)
+ draManager, err := newTestDRAManager(t, tCtx, logger, tc.objects...)
if err != nil {
- t.Fatalf("couldn't start resource slice tracker: %v", err)
+ t.Fatalf("failed to create fake DRA manager: %v", err)
}
- draManager := dynamicresources.NewDRAManager(
- tCtx,
- assumecache.NewAssumeCache(
- logger,
- informerFactory.Resource().V1().ResourceClaims().Informer(),
- "resource claim",
- "",
- nil),
- resourceSliceTracker,
- informerFactory)
-
- if tc.enableDRAExtendedResource {
- cache := draManager.DeviceClassResolver().(*extendedresourcecache.ExtendedResourceCache)
- if _, err := informerFactory.Resource().V1().DeviceClasses().Informer().AddEventHandler(cache); err != nil {
- logger.Error(err, "failed to add device class informer event handler")
- }
- }
-
- informerFactory.Start(tCtx.Done())
- t.Cleanup(func() {
- // Now we can wait for all goroutines to stop.
- informerFactory.Shutdown()
- })
- informerFactory.WaitForCacheSync(tCtx.Done())
nodeInfo := framework.NewNodeInfo()
nodeInfo.SetNode(tc.node)
}
}
+// newTestDRAManager creates a DefaultDRAManager for testing purposes
+func newTestDRAManager(t *testing.T, ctx context.Context, logger klog.Logger, objects ...apiruntime.Object) (*dynamicresources.DefaultDRAManager, error) {
+ client := fake.NewClientset(objects...)
+ informerFactory := informers.NewSharedInformerFactory(client, 0)
+ resourceSliceTrackerOpts := tracker.Options{
+ SliceInformer: informerFactory.Resource().V1().ResourceSlices(),
+ TaintInformer: informerFactory.Resource().V1alpha3().DeviceTaintRules(),
+ ClassInformer: informerFactory.Resource().V1().DeviceClasses(),
+ KubeClient: client,
+ }
+ resourceSliceTracker, err := tracker.StartTracker(ctx, resourceSliceTrackerOpts)
+ if err != nil {
+ return nil, fmt.Errorf("couldn't start resource slice tracker: %w", err)
+ }
+ draManager := dynamicresources.NewDRAManager(
+ ctx,
+ assumecache.NewAssumeCache(
+ logger,
+ informerFactory.Resource().V1().ResourceClaims().Informer(),
+ "resource claim",
+ "",
+ nil),
+ resourceSliceTracker,
+ informerFactory)
+
+ cache := draManager.DeviceClassResolver().(*extendedresourcecache.ExtendedResourceCache)
+ if _, err := informerFactory.Resource().V1().DeviceClasses().Informer().AddEventHandler(cache); err != nil {
+ return nil, fmt.Errorf("failed to add device class informer event handler: %w", err)
+ }
+
+ informerFactory.Start(ctx.Done())
+ t.Cleanup(func() {
+ // Now we can wait for all goroutines to stop.
+ informerFactory.Shutdown()
+ })
+ informerFactory.WaitForCacheSync(ctx.Done())
+
+ return draManager, nil
+}
+
// getCachedDeviceMatch checks the cache for a DeviceMatches result
// returns (matches, found)
func (r *resourceAllocationScorer) getCachedDeviceMatch(expression string, driver string, poolName string, deviceName string) (bool, bool) {