]> git.feebdaed.xyz Git - 0xmirror/kubernetes.git/commitdiff
DRA ExtendedResourceCache: avoid risk of flakes
authorPatrick Ohly <patrick.ohly@intel.com>
Thu, 25 Dec 2025 18:44:24 +0000 (19:44 +0100)
committerPatrick Ohly <patrick.ohly@intel.com>
Thu, 25 Dec 2025 18:59:20 +0000 (19:59 +0100)
The unit test flaked at least once so far in the CI. Can be reproduced
locally by running many instances of the test in parallel.

There were two potential root causes:
- The watch must be set up completely before creating objects (a fake
  client-go limitation).
- The one second timeout might be too small on a loaded system.

By running the tests in a synctest bubble with synctest.Wait calls at
the right places both problems are avoided. As a welcome side effect
the test also completes faster.

Before:

    $ go test k8s.io/dynamic-resource-allocation/deviceclass/extendedresourcecache
    ok   k8s.io/dynamic-resource-allocation/deviceclass/extendedresourcecache 9.300s

    $ stress -p 512 ./extendedresourcecache.test
    5s: 0 runs so far, 0 failures, 512 active
    10s: 8 runs so far, 0 failures, 512 active

    /tmp/go-stress-20251225T195231-2875181701
    --- FAIL: TestExtendedResourceCache (9.14s)
        extendedresourcecache_test.go:234: Expected to find device class 'gpu-class' for 'example.com/gpu', got nil
        extendedresourcecache_test.go:241: Expected to find device class 'fpga-class' for 'deviceclass.resource.kubernetes.io/fpga-class', got nil
        extendedresourcecache_test.go:247: Expected default mapping for gpu-class
        extendedresourcecache.go:197: I1225 19:52:36.332170] Updated extended resource cache for explicit mapping extendedResource="example.com/gpu" deviceClass="gpu-class-3"
        extendedresourcecache.go:204: I1225 19:52:36.332216] Updated extended resource cache for default mapping extendedResource="deviceclass.resource.kubernetes.io/gpu-class-3" deviceClass="gpu-class-3"
        extendedresourcecache.go:220: I1225 19:52:36.332245] Updated device class mapping deviceClass="gpu-class-3" extendedResource="example.com/gpu"
        extendedresourcecache_test.go:260: Expected to find device class 'gpu-class' for 'example.com/gpu', got &DeviceClass{ObjectMeta:{gpu-class-3      0 2025-12-24 19:52:35.32560833 +0100 CET m=-86396.691266715 <nil> <nil> map[] map[] [] [] [{unknown Update resource.k8s.io/v1 2025-12-25 18:52:36.332067439 +0000 UTC FieldsV1 {"f:spec":{"f:extendedResourceName":{}}} }]},Spec:DeviceClassSpec{Selectors:[]DeviceSelector{},Config:[]DeviceClassConfiguration{},ExtendedResourceName:*example.com/gpu,},}
        extendedresourcecache.go:197: I1225 19:52:37.336135] Updated extended resource cache for explicit mapping extendedResource="example.com/gpu" deviceClass="gpu-class-4"
        extendedresourcecache.go:204: I1225 19:52:37.336169] Updated extended resource cache for default mapping extendedResource="deviceclass.resource.kubernetes.io/gpu-class-4" deviceClass="gpu-class-4"
        extendedresourcecache.go:220: I1225 19:52:37.336192] Updated device class mapping deviceClass="gpu-class-4" extendedResource="example.com/gpu"
        extendedresourcecache.go:197: I1225 19:52:38.340121] Updated extended resource cache for explicit mapping extendedResource="example

After:

    ok   k8s.io/dynamic-resource-allocation/deviceclass/extendedresourcecache 0.064s
    ...
    2m0s: 7063 runs so far, 0 failures, 512 active

staging/src/k8s.io/dynamic-resource-allocation/deviceclass/extendedresourcecache/extendedresourcecache.go
staging/src/k8s.io/dynamic-resource-allocation/deviceclass/extendedresourcecache/extendedresourcecache_test.go

index f70124fd343ce30c99c66181b401baa8f684fbfe..96d27fc433698334f82f25f7d67c6f718f6fc250 100644 (file)
@@ -102,6 +102,7 @@ func (c *ExtendedResourceCache) OnAdd(obj interface{}, isInInitialList bool) {
                utilruntime.HandleErrorWithLogger(c.logger, nil, "Expected DeviceClass", "actual", fmt.Sprintf("%T", obj))
                return
        }
+       c.logger.V(5).Info("DeviceClass added", "deviceClass", klog.KObj(deviceClass))
        c.updateResourceName2class(deviceClass, nil)
        c.updateClass2ResourceName(deviceClass)
 
@@ -122,6 +123,7 @@ func (c *ExtendedResourceCache) OnUpdate(oldObj, newObj interface{}) {
                utilruntime.HandleErrorWithLogger(c.logger, nil, "Expected DeviceClass", "actual", fmt.Sprintf("%T", oldObj))
                return
        }
+       c.logger.V(5).Info("DeviceClass updated", "deviceClass", klog.KObj(deviceClass))
        c.updateResourceName2class(deviceClass, oldDeviceClass)
        c.updateClass2ResourceName(deviceClass)
 
@@ -140,6 +142,7 @@ func (c *ExtendedResourceCache) OnDelete(obj interface{}) {
                utilruntime.HandleErrorWithLogger(c.logger, nil, "Expected DeviceClass", "actual", fmt.Sprintf("%T", obj))
                return
        }
+       c.logger.V(5).Info("DeviceClass deleted", "deviceClass", klog.KObj(deviceClass))
        c.removeResourceName2class(deviceClass)
        c.removeClass2ResourceName(deviceClass)
 
index 58171d77d266654fd5f937a27e401a8c16d5d0a0..61889507f9a1a84a946b82fac19d298e7b833e7f 100644 (file)
@@ -19,6 +19,7 @@ package extendedresourcecache
 import (
        "context"
        "testing"
+       "testing/synctest"
        "time"
 
        v1 "k8s.io/api/core/v1"
@@ -28,6 +29,7 @@ import (
        "k8s.io/client-go/kubernetes/fake"
        "k8s.io/client-go/tools/cache"
        "k8s.io/klog/v2/ktesting"
+       _ "k8s.io/klog/v2/ktesting/init" // Add command line flags.
        "k8s.io/utils/ptr"
 )
 
@@ -140,25 +142,9 @@ func TestHandlers(t *testing.T) {
        erCache.OnDelete(updatedClass)
 }
 
-func TestExtendedResourceCache(t *testing.T) {
-       logger, ctx := ktesting.NewTestContext(t)
-       tCtx, tCancel := context.WithCancel(ctx)
-       client := fake.NewClientset()
-       informerFactory := informers.NewSharedInformerFactory(client, 0)
-
-       deviceClassInformer := informerFactory.Resource().V1().DeviceClasses()
-       cache := NewExtendedResourceCache(logger)
-       if _, err := deviceClassInformer.Informer().AddEventHandler(cache); err != nil {
-               logger.Error(err, "Failed to add event handler for device classes")
-       }
-       informerFactory.Start(tCtx.Done())
-       t.Cleanup(func() {
-               // Need to cancel before waiting for the shutdown.
-               tCancel()
-               // Now we can wait for all goroutines to stop.
-               informerFactory.Shutdown()
-       })
-       informerFactory.WaitForCacheSync(tCtx.Done())
+func TestExtendedResourceCache(t *testing.T) { synctest.Test(t, testExtendedResourceCache) }
+func testExtendedResourceCache(t *testing.T) {
+       tCtx, client, cache := setup(t)
 
        // Test with a device class that has an explicit extended resource name
        now := time.Now()
@@ -226,7 +212,7 @@ func TestExtendedResourceCache(t *testing.T) {
        if err != nil {
                t.Fatalf("Failed to create device class: %v", err)
        }
-       time.Sleep(1 * time.Second)
+       synctest.Wait()
 
        // Verify explicit mapping
        deviceClass := cache.GetDeviceClass("example.com/gpu")
@@ -252,7 +238,7 @@ func TestExtendedResourceCache(t *testing.T) {
        if err != nil {
                t.Fatalf("Failed to create device class: %v", err)
        }
-       time.Sleep(1 * time.Second)
+       synctest.Wait()
 
        // should keep deviceClass1, since it is newer than deviceClass3
        deviceClass = cache.GetDeviceClass("example.com/gpu")
@@ -265,7 +251,7 @@ func TestExtendedResourceCache(t *testing.T) {
        if err != nil {
                t.Fatalf("Failed to create device class: %v", err)
        }
-       time.Sleep(1 * time.Second)
+       synctest.Wait()
 
        // deviceClass4 replaces deviceClass1, since it is newer with the same example.com/gpu extended resource name
        deviceClass = cache.GetDeviceClass("example.com/gpu")
@@ -279,7 +265,7 @@ func TestExtendedResourceCache(t *testing.T) {
        if err != nil {
                t.Fatalf("Failed to create device class: %v", err)
        }
-       time.Sleep(1 * time.Second)
+       synctest.Wait()
 
        // deviceClass0 replaces deviceClass4, it is created at the same time as deviceClass4, but its name is
        // alphabetically ordered earlier
@@ -295,7 +281,7 @@ func TestExtendedResourceCache(t *testing.T) {
        if err != nil {
                t.Fatalf("Failed to update device class: %v", err)
        }
-       time.Sleep(1 * time.Second)
+       synctest.Wait()
 
        // Should have the new mapping
        deviceClass = cache.GetDeviceClass("test.com/gpu")
@@ -312,7 +298,7 @@ func TestExtendedResourceCache(t *testing.T) {
        if err != nil {
                t.Fatalf("Failed to delete device class: %v", err)
        }
-       time.Sleep(1 * time.Second)
+       synctest.Wait()
 
        deviceClass = cache.GetDeviceClass("test.com/gpu")
        if deviceClass != nil {
@@ -324,24 +310,9 @@ func TestExtendedResourceCache(t *testing.T) {
        }
 }
 
-func TestDeviceClassMapping(t *testing.T) {
-       logger, ctx := ktesting.NewTestContext(t)
-       tCtx, tCancel := context.WithCancel(ctx)
-
-       client := fake.NewClientset()
-       informerFactory := informers.NewSharedInformerFactory(client, 0)
-       cache := NewExtendedResourceCache(logger)
-       if _, err := informerFactory.Resource().V1().DeviceClasses().Informer().AddEventHandler(cache); err != nil {
-               logger.Error(err, "failed to add device class informer event handler")
-       }
-       informerFactory.Start(tCtx.Done())
-       t.Cleanup(func() {
-               // Need to cancel before waiting for the shutdown.
-               tCancel()
-               // Now we can wait for all goroutines to stop.
-               informerFactory.Shutdown()
-       })
-       informerFactory.WaitForCacheSync(tCtx.Done())
+func TestDeviceClassMapping(t *testing.T) { synctest.Test(t, testDeviceClassMapping) }
+func testDeviceClassMapping(t *testing.T) {
+       tCtx, client, cache := setup(t)
 
        deviceClass1 := &resourceapi.DeviceClass{
                ObjectMeta: metav1.ObjectMeta{
@@ -368,7 +339,8 @@ func TestDeviceClassMapping(t *testing.T) {
                t.Fatalf("Failed to create device class: %v", err)
        }
 
-       time.Sleep(1 * time.Second)
+       // Wait for background goroutines to handle the new classes.
+       synctest.Wait()
        name := cache.GetExtendedResource("gpu-class")
        if name != "example.com/gpu" {
                t.Errorf("Expected to find device class 'gpu-class', got %s", name)
@@ -386,7 +358,7 @@ func TestDeviceClassMapping(t *testing.T) {
                t.Fatalf("Failed to update device class: %v", err)
        }
 
-       time.Sleep(1 * time.Second)
+       synctest.Wait()
        name = cache.GetExtendedResource("gpu-class")
        if name != "my.com/gpu" {
                t.Errorf("Expected to find device class 'gpu-class' with  'my.com/gpu' after modification, got %s", name)
@@ -398,9 +370,49 @@ func TestDeviceClassMapping(t *testing.T) {
                t.Fatalf("Failed to delete device class: %v", err)
        }
 
-       time.Sleep(1 * time.Second)
+       synctest.Wait()
        name = cache.GetExtendedResource("gpu-class")
        if name != "" {
                t.Error("Expected 'gpu-class' not found after deletion")
        }
 }
+
+func setup(t *testing.T) (context.Context, *fake.Clientset, *ExtendedResourceCache) {
+       logger, ctx := ktesting.NewTestContext(t)
+       ctx, cancel := context.WithCancel(ctx)
+       t.Cleanup(cancel)
+
+       client := fake.NewClientset()
+       informerFactory := informers.NewSharedInformerFactory(client, 0)
+       ec := NewExtendedResourceCache(logger)
+       handle, err := informerFactory.Resource().V1().DeviceClasses().Informer().AddEventHandler(ec)
+       if err != nil {
+               t.Fatalf("failed to add device class informer event handler: %v", err)
+       }
+       informerFactory.Start(ctx.Done())
+       t.Cleanup(func() {
+               // Need to cancel before waiting for the shutdown.
+               cancel()
+               // Now we can wait for all goroutines to stop.
+               informerFactory.Shutdown()
+       })
+       informerFactory.WaitForCacheSync(ctx.Done())
+       cache.WaitForNamedCacheSyncWithContext(ctx, handle.HasSynced)
+
+       // fake.Clientset suffers from a race condition related to informers:
+       // it does not implement resource version support in its Watch
+       // implementation and instead assumes that watches are set up
+       // before further changes are made.
+       //
+       // If a test waits for caches to be synced and then immediately
+       // adds an object, that new object will never be seen by event handlers
+       // if the race goes wrong and the Watch call hadn't completed yet
+       // (can be triggered by adding a sleep before https://github.com/kubernetes/kubernetes/blob/b53b9fb5573323484af9a19cf3f5bfe80760abba/staging/src/k8s.io/client-go/tools/cache/reflector.go#L431).
+       //
+       // To work around that, we wait here for the goroutines which
+       // are involved in setting up the watch *before* creating
+       // DeviceClasses.
+       synctest.Wait()
+
+       return ctx, client, ec
+}