import (
"context"
"testing"
+ "testing/synctest"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2/ktesting"
+ _ "k8s.io/klog/v2/ktesting/init" // Add command line flags.
"k8s.io/utils/ptr"
)
erCache.OnDelete(updatedClass)
}
-func TestExtendedResourceCache(t *testing.T) {
- logger, ctx := ktesting.NewTestContext(t)
- tCtx, tCancel := context.WithCancel(ctx)
- client := fake.NewClientset()
- informerFactory := informers.NewSharedInformerFactory(client, 0)
-
- deviceClassInformer := informerFactory.Resource().V1().DeviceClasses()
- cache := NewExtendedResourceCache(logger)
- if _, err := deviceClassInformer.Informer().AddEventHandler(cache); err != nil {
- logger.Error(err, "Failed to add event handler for device classes")
- }
- informerFactory.Start(tCtx.Done())
- t.Cleanup(func() {
- // Need to cancel before waiting for the shutdown.
- tCancel()
- // Now we can wait for all goroutines to stop.
- informerFactory.Shutdown()
- })
- informerFactory.WaitForCacheSync(tCtx.Done())
+func TestExtendedResourceCache(t *testing.T) { synctest.Test(t, testExtendedResourceCache) }
+func testExtendedResourceCache(t *testing.T) {
+ tCtx, client, cache := setup(t)
// Test with a device class that has an explicit extended resource name
now := time.Now()
if err != nil {
t.Fatalf("Failed to create device class: %v", err)
}
- time.Sleep(1 * time.Second)
+ synctest.Wait()
// Verify explicit mapping
deviceClass := cache.GetDeviceClass("example.com/gpu")
if err != nil {
t.Fatalf("Failed to create device class: %v", err)
}
- time.Sleep(1 * time.Second)
+ synctest.Wait()
// should keep deviceClass1, since it is newer than deviceClass3
deviceClass = cache.GetDeviceClass("example.com/gpu")
if err != nil {
t.Fatalf("Failed to create device class: %v", err)
}
- time.Sleep(1 * time.Second)
+ synctest.Wait()
// deviceClass4 replaces deviceClass1, since it is newer with the same example.com/gpu extended resource name
deviceClass = cache.GetDeviceClass("example.com/gpu")
if err != nil {
t.Fatalf("Failed to create device class: %v", err)
}
- time.Sleep(1 * time.Second)
+ synctest.Wait()
// deviceClass0 replaces deviceClass4, it is created at the same time as deviceClass4, but its name is
// alphabetically ordered earlier
if err != nil {
t.Fatalf("Failed to update device class: %v", err)
}
- time.Sleep(1 * time.Second)
+ synctest.Wait()
// Should have the new mapping
deviceClass = cache.GetDeviceClass("test.com/gpu")
if err != nil {
t.Fatalf("Failed to delete device class: %v", err)
}
- time.Sleep(1 * time.Second)
+ synctest.Wait()
deviceClass = cache.GetDeviceClass("test.com/gpu")
if deviceClass != nil {
}
}
-func TestDeviceClassMapping(t *testing.T) {
- logger, ctx := ktesting.NewTestContext(t)
- tCtx, tCancel := context.WithCancel(ctx)
-
- client := fake.NewClientset()
- informerFactory := informers.NewSharedInformerFactory(client, 0)
- cache := NewExtendedResourceCache(logger)
- if _, err := informerFactory.Resource().V1().DeviceClasses().Informer().AddEventHandler(cache); err != nil {
- logger.Error(err, "failed to add device class informer event handler")
- }
- informerFactory.Start(tCtx.Done())
- t.Cleanup(func() {
- // Need to cancel before waiting for the shutdown.
- tCancel()
- // Now we can wait for all goroutines to stop.
- informerFactory.Shutdown()
- })
- informerFactory.WaitForCacheSync(tCtx.Done())
+func TestDeviceClassMapping(t *testing.T) { synctest.Test(t, testDeviceClassMapping) }
+func testDeviceClassMapping(t *testing.T) {
+ tCtx, client, cache := setup(t)
deviceClass1 := &resourceapi.DeviceClass{
ObjectMeta: metav1.ObjectMeta{
t.Fatalf("Failed to create device class: %v", err)
}
- time.Sleep(1 * time.Second)
+ // Wait for background goroutines to handle the new classes.
+ synctest.Wait()
name := cache.GetExtendedResource("gpu-class")
if name != "example.com/gpu" {
t.Errorf("Expected to find device class 'gpu-class', got %s", name)
t.Fatalf("Failed to update device class: %v", err)
}
- time.Sleep(1 * time.Second)
+ synctest.Wait()
name = cache.GetExtendedResource("gpu-class")
if name != "my.com/gpu" {
t.Errorf("Expected to find device class 'gpu-class' with 'my.com/gpu' after modification, got %s", name)
t.Fatalf("Failed to delete device class: %v", err)
}
- time.Sleep(1 * time.Second)
+ synctest.Wait()
name = cache.GetExtendedResource("gpu-class")
if name != "" {
t.Error("Expected 'gpu-class' not found after deletion")
}
}
+
+func setup(t *testing.T) (context.Context, *fake.Clientset, *ExtendedResourceCache) {
+ logger, ctx := ktesting.NewTestContext(t)
+ ctx, cancel := context.WithCancel(ctx)
+ t.Cleanup(cancel)
+
+ client := fake.NewClientset()
+ informerFactory := informers.NewSharedInformerFactory(client, 0)
+ ec := NewExtendedResourceCache(logger)
+ handle, err := informerFactory.Resource().V1().DeviceClasses().Informer().AddEventHandler(ec)
+ if err != nil {
+ t.Fatalf("failed to add device class informer event handler: %v", err)
+ }
+ informerFactory.Start(ctx.Done())
+ t.Cleanup(func() {
+ // Need to cancel before waiting for the shutdown.
+ cancel()
+ // Now we can wait for all goroutines to stop.
+ informerFactory.Shutdown()
+ })
+ informerFactory.WaitForCacheSync(ctx.Done())
+ cache.WaitForNamedCacheSyncWithContext(ctx, handle.HasSynced)
+
+ // fake.Clientset suffers from a race condition related to informers:
+ // it does not implement resource version support in its Watch
+ // implementation and instead assumes that watches are set up
+ // before further changes are made.
+ //
+ // If a test waits for caches to be synced and then immediately
+ // adds an object, that new object will never be seen by event handlers
+ // if the race goes wrong and the Watch call hadn't completed yet
+ // (can be triggered by adding a sleep before https://github.com/kubernetes/kubernetes/blob/b53b9fb5573323484af9a19cf3f5bfe80760abba/staging/src/k8s.io/client-go/tools/cache/reflector.go#L431).
+ //
+ // To work around that, we wait here for the goroutines which
+ // are involved in setting up the watch *before* creating
+ // DeviceClasses.
+ synctest.Wait()
+
+ return ctx, client, ec
+}