]> git.feebdaed.xyz Git - 0xmirror/containerd.git/commitdiff
cri: Add background stats collector to calculate UsageNanoCores
authorDavanum Srinivas <davanum@gmail.com>
Thu, 4 Dec 2025 18:29:07 +0000 (13:29 -0500)
committerDavanum Srinivas <davanum@gmail.com>
Mon, 8 Dec 2025 21:03:06 +0000 (16:03 -0500)
adds a background stats collector that calculates `UsageNanoCores` for containers and pod sandboxes.

- run in the background every second to collect CPU metrics for all containers and sandboxes (similar to what cAdvisor does)
- keep a rolling buffer of CPU samples and calculates the instantaneous CPU usage rate from consecutive samples
- read pod-level CPU stats from the parent cgroup rather than the pause container
- add cgroupv2 Pressure Stall Information for CPU, memory, and IO
- add missing `Timestamp` and `Interfaces` fields

when Kubernetes runs with `PodAndContainerStatsFromCRI=true`, it expects `UsageNanoCores` to be set in stats responses.
This value represents how much CPU is being used right now (as opposed to `UsageCoreNanoSeconds` which is cumulative).
To calculate it, we need to compare CPU samples over time to replicate what is in cadvisor.

we can't yet really test this in CI as some changes in kubernetes has to land for `--feature-gates=PodAndContainerStatsFromCRI=true`

Signed-off-by: Davanum Srinivas <davanum@gmail.com>
16 files changed:
.github/workflows/node-e2e.yml
docs/cri/config.md
internal/cri/config/config.go
internal/cri/server/container_stats_list.go
internal/cri/server/sandbox_stats_linux.go
internal/cri/server/service.go
internal/cri/server/service_test.go
internal/cri/server/stats_collector.go [new file with mode: 0644]
internal/cri/server/stats_collector_other.go [new file with mode: 0644]
internal/cri/store/container/container.go
internal/cri/store/container/container_test.go
internal/cri/store/sandbox/sandbox.go
internal/cri/store/sandbox/sandbox_test.go
internal/cri/store/stats/timed_store.go [new file with mode: 0644]
internal/cri/store/stats/timed_store_test.go [new file with mode: 0644]
internal/cri/store/util.go

index f4441d125f6487cfcd6801b79c49dcaffce2a8f3..41a1f19f8997292f3661ca2c1c238137d6034c9a 100644 (file)
@@ -11,6 +11,25 @@ jobs:
     runs-on: ubuntu-24.04
     timeout-minutes: 90
     steps:
+      - name: Clean up disk space
+        run: |
+          df -h
+          sudo rm -rf /usr/share/dotnet \
+            /usr/local/graalvm \
+            /usr/local/.ghcup \
+            /usr/local/share/powershell \
+            /usr/local/share/chromium \
+            /usr/local/share/firefox \
+            /usr/local/lib/android \
+            /usr/local/lib/node_modules \
+            /usr/local/share/podman \
+            /usr/local/aws-cli/ \
+            /usr/local/lib/heroku \
+            /usr/local/rustup \
+            /usr/local/cargo \
+            /usr/local/google-cloud-sdk
+          df -h
+
       - name: Checkout containerd
         uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
         with:
@@ -21,6 +40,7 @@ jobs:
         with:
           repository: kubernetes/kubernetes
           path: src/k8s.io/kubernetes
+          fetch-depth: 0
 
       - name: Install Go
         uses: ./src/github.com/containerd/containerd/.github/actions/install-go
index 49c81a2721aa030e49a79ff89bc7e836aaf23031..caa55daf3e68b1f957e96e1a43d2eec4cc742f58 100644 (file)
@@ -272,6 +272,8 @@ version = 3
     cdi_spec_dirs = ['/etc/cdi', '/var/run/cdi']
     drain_exec_sync_io_timeout = '0s'
     ignore_deprecation_warnings = []
+    stats_collection_period = '1s'
+    stats_retention_period = '2m'
 
     [plugins.'io.containerd.cri.v1.runtime'.containerd]
       default_runtime_name = 'runc'
index 98d62a7054faf41b0fbd3be633d835e9cc5cac2d..018d2b712c084484b0c10bbce6bd54d1b7ac7123 100644 (file)
@@ -427,6 +427,19 @@ type RuntimeConfig struct {
        // IgnoreDeprecationWarnings is the list of the deprecation IDs (such as "io.containerd.deprecation/pull-schema-1-image")
        // that should be ignored for checking "ContainerdHasNoDeprecationWarnings" condition.
        IgnoreDeprecationWarnings []string `toml:"ignore_deprecation_warnings" json:"ignoreDeprecationWarnings"`
+
+       // StatsCollectionPeriod is the period for collecting container/sandbox CPU stats
+       // used for calculating UsageNanoCores. This matches cAdvisor's default housekeeping interval.
+       // The string is in the golang duration format, see:
+       //   https://golang.org/pkg/time/#ParseDuration
+       // Default: "1s"
+       StatsCollectionPeriod string `toml:"stats_collection_period" json:"statsCollectionPeriod"`
+
+       // StatsRetentionPeriod is how long to retain CPU stats samples for calculating UsageNanoCores.
+       // The string is in the golang duration format, see:
+       //   https://golang.org/pkg/time/#ParseDuration
+       // Default: "2m"
+       StatsRetentionPeriod string `toml:"stats_retention_period" json:"statsRetentionPeriod"`
 }
 
 // X509KeyPairStreaming contains the x509 configuration for streaming
@@ -669,6 +682,18 @@ func ValidateRuntimeConfig(ctx context.Context, c *RuntimeConfig) ([]deprecation
                        return warnings, fmt.Errorf("invalid `drain_exec_sync_io_timeout`: %w", err)
                }
        }
+       // Validation for stats_collection_period
+       if c.StatsCollectionPeriod != "" {
+               if _, err := time.ParseDuration(c.StatsCollectionPeriod); err != nil {
+                       return warnings, fmt.Errorf("invalid `stats_collection_period`: %w", err)
+               }
+       }
+       // Validation for stats_retention_period
+       if c.StatsRetentionPeriod != "" {
+               if _, err := time.ParseDuration(c.StatsRetentionPeriod); err != nil {
+                       return warnings, fmt.Errorf("invalid `stats_retention_period`: %w", err)
+               }
+       }
        if err := ValidateEnableUnprivileged(ctx, c); err != nil {
                return warnings, err
        }
index 59134d86d5aa8eb0faf586681316617a16aa58b9..9896eb75eb12a43b4e6bf69472c9537d673d2cff 100644 (file)
@@ -156,11 +156,11 @@ func (c *criService) toContainerStats(
                }
 
                if cs.stats.Cpu != nil && cs.stats.Cpu.UsageCoreNanoSeconds != nil {
-                       // this is a calculated value and should be computed for all OSes
+                       // UsageNanoCores is a calculated value and should be computed for all OSes
                        nanoUsage, err := c.getUsageNanoCores(cntr.Metadata.ID, false, cs.stats.Cpu.UsageCoreNanoSeconds.Value, time.Unix(0, cs.stats.Cpu.Timestamp))
                        if err != nil {
                                // If an error occurred when getting nano cores usage, skip the container
-                               log.G(ctx).Warnf("skipping container %q, failed to get metrics handler: %v", cntr.ID, err.Error())
+                               log.G(ctx).WithError(err).Warnf("failed to get usage nano cores for container %q", cntr.ID)
                                continue
                        }
                        cs.stats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoUsage}
@@ -179,6 +179,17 @@ func (c *criService) toCRIContainerStats(css []containerStats) *runtime.ListCont
 }
 
 func (c *criService) getUsageNanoCores(containerID string, isSandbox bool, currentUsageCoreNanoSeconds uint64, currentTimestamp time.Time) (uint64, error) {
+       // First, try to get pre-calculated UsageNanoCores from the background stats collector.
+       // This ensures we have valid data even on the first query (as the collector runs
+       // continuously in the background, similar to cAdvisor's housekeeping).
+       if c.statsCollector != nil {
+               if usageNanoCores, ok := c.statsCollector.GetUsageNanoCores(containerID); ok {
+                       return usageNanoCores, nil
+               }
+       }
+
+       // Fall back to the original implementation if the collector doesn't have data.
+       // This can happen for newly created containers that haven't been collected yet.
        var oldStats *stats.ContainerStats
 
        if isSandbox {
@@ -424,9 +435,9 @@ func (c *criService) linuxContainerMetrics(
                return containerStats{}, fmt.Errorf("failed to obtain memory stats: %w", err)
        }
        cs.Memory = memoryStats
-       if err != nil {
-               return containerStats{}, fmt.Errorf("failed to obtain pid count: %w", err)
-       }
+
+       // IO stats (only has PSI for cgroupv2)
+       cs.Io = c.ioContainerStats(metrics, protobuf.FromTimestamp(stats.Timestamp))
 
        return containerStats{&cs, pids}, nil
 }
@@ -482,6 +493,36 @@ func getAvailableBytesV2(memory *cg2.MemoryStat, workingSetBytes uint64) uint64
        return 0
 }
 
+// convertCg2PSIToCRI converts cgroupv2 PSIStats to CRI PsiStats.
+// cgroupv2 PSI Total is in microseconds, CRI expects nanoseconds.
+func convertCg2PSIToCRI(psi *cg2.PSIStats) *runtime.PsiStats {
+       if psi == nil {
+               return nil
+       }
+
+       result := &runtime.PsiStats{}
+
+       if psi.Full != nil {
+               result.Full = &runtime.PsiData{
+                       Total:  psi.Full.Total * 1000, // convert microseconds to nanoseconds
+                       Avg10:  psi.Full.Avg10,
+                       Avg60:  psi.Full.Avg60,
+                       Avg300: psi.Full.Avg300,
+               }
+       }
+
+       if psi.Some != nil {
+               result.Some = &runtime.PsiData{
+                       Total:  psi.Some.Total * 1000, // convert microseconds to nanoseconds
+                       Avg10:  psi.Some.Avg10,
+                       Avg60:  psi.Some.Avg60,
+                       Avg300: psi.Some.Avg300,
+               }
+       }
+
+       return result
+}
+
 func (c *criService) cpuContainerStats(ID string, isSandbox bool, stats cgroupMetrics, timestamp time.Time) (*runtime.CpuUsage, error) {
        switch {
        case stats.v1 != nil:
@@ -498,10 +539,10 @@ func (c *criService) cpuContainerStats(ID string, isSandbox bool, stats cgroupMe
                if metrics.CPU != nil {
                        // convert to nano seconds
                        usageCoreNanoSeconds := metrics.CPU.UsageUsec * 1000
-
                        return &runtime.CpuUsage{
                                Timestamp:            timestamp.UnixNano(),
                                UsageCoreNanoSeconds: &runtime.UInt64Value{Value: usageCoreNanoSeconds},
+                               Psi:                  convertCg2PSIToCRI(metrics.CPU.PSI),
                        }, nil
                }
        }
@@ -544,8 +585,26 @@ func (c *criService) memoryContainerStats(ID string, stats cgroupMetrics, timest
                                RssBytes:        &runtime.UInt64Value{Value: metrics.Memory.Anon},
                                PageFaults:      &runtime.UInt64Value{Value: metrics.Memory.Pgfault},
                                MajorPageFaults: &runtime.UInt64Value{Value: metrics.Memory.Pgmajfault},
+                               Psi:             convertCg2PSIToCRI(metrics.Memory.PSI),
                        }, nil
                }
        }
        return nil, nil
 }
+
+func (c *criService) ioContainerStats(stats cgroupMetrics, timestamp time.Time) *runtime.IoUsage {
+       switch {
+       case stats.v1 != nil:
+               // cgroupv1 doesn't have IO PSI stats
+               return nil
+       case stats.v2 != nil:
+               metrics := stats.v2
+               if metrics.Io != nil && metrics.Io.PSI != nil {
+                       return &runtime.IoUsage{
+                               Timestamp: timestamp.UnixNano(),
+                               Psi:       convertCg2PSIToCRI(metrics.Io.PSI),
+                       }
+               }
+       }
+       return nil
+}
index c8bbb22f995386e28c49e28c6bc05b9a01f917b7..5cab11558f1d58de01416ffdd1d1ca6accbfd09b 100644 (file)
@@ -63,6 +63,13 @@ func (c *criService) podSandboxStats(
        if err != nil {
                return nil, fmt.Errorf("failed to obtain cpu stats: %w", err)
        }
+       if cpuStats != nil && cpuStats.UsageCoreNanoSeconds != nil {
+               nanoUsage, err := c.getUsageNanoCores(meta.ID, true /* isSandbox */, cpuStats.UsageCoreNanoSeconds.Value, timestamp)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to get usage nano cores: %w", err)
+               }
+               cpuStats.UsageNanoCores = &runtime.UInt64Value{Value: nanoUsage}
+       }
        podSandboxStats.Linux.Cpu = cpuStats
 
        memoryStats, err := c.memoryContainerStats(meta.ID, *stats, timestamp)
@@ -76,14 +83,17 @@ func (c *criService) podSandboxStats(
                if err != nil {
                        return nil, fmt.Errorf("failed to obtain network stats: %w", err)
                }
+               defaultInterface := &runtime.NetworkInterfaceUsage{
+                       Name:     defaultIfName,
+                       RxBytes:  &runtime.UInt64Value{Value: linkStats.RxBytes},
+                       RxErrors: &runtime.UInt64Value{Value: linkStats.RxErrors},
+                       TxBytes:  &runtime.UInt64Value{Value: linkStats.TxBytes},
+                       TxErrors: &runtime.UInt64Value{Value: linkStats.TxErrors},
+               }
                podSandboxStats.Linux.Network = &runtime.NetworkUsage{
-                       DefaultInterface: &runtime.NetworkInterfaceUsage{
-                               Name:     defaultIfName,
-                               RxBytes:  &runtime.UInt64Value{Value: linkStats.RxBytes},
-                               RxErrors: &runtime.UInt64Value{Value: linkStats.RxErrors},
-                               TxBytes:  &runtime.UInt64Value{Value: linkStats.TxBytes},
-                               TxErrors: &runtime.UInt64Value{Value: linkStats.TxErrors},
-                       },
+                       Timestamp:        timestamp.UnixNano(),
+                       DefaultInterface: defaultInterface,
+                       Interfaces:       []*runtime.NetworkInterfaceUsage{defaultInterface},
                }
        }
 
index dcbbc7588ca122467a360440ad3e76639edbb853..69232d056163012fecc92802c53cbac6370e4f8c 100644 (file)
@@ -160,6 +160,8 @@ type criService struct {
        runtimeHandlers map[string]*runtime.RuntimeHandler
        // runtimeFeatures container runtime features info
        runtimeFeatures *runtime.RuntimeFeatures
+       // statsCollector collects CPU stats in background for UsageNanoCores calculation
+       statsCollector *StatsCollector
 }
 
 type CRIServiceOptions struct {
@@ -187,6 +189,9 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi
        labels := label.NewStore()
        config := options.RuntimeService.Config()
 
+       // Create the stats collector first so it can be passed to the stores
+       statsCollector := NewStatsCollector(config)
+
        c := &criService{
                RuntimeService:     options.RuntimeService,
                ImageService:       options.ImageService,
@@ -194,13 +199,14 @@ func NewCRIService(options *CRIServiceOptions) (CRIService, runtime.RuntimeServi
                client:             options.Client,
                imageFSPaths:       options.ImageService.ImageFSPaths(),
                os:                 osinterface.RealOS{},
-               sandboxStore:       sandboxstore.NewStore(labels),
-               containerStore:     containerstore.NewStore(labels),
+               sandboxStore:       sandboxstore.NewStore(labels, statsCollector),
+               containerStore:     containerstore.NewStore(labels, statsCollector),
                sandboxNameIndex:   registrar.NewRegistrar(),
                containerNameIndex: registrar.NewRegistrar(),
                netPlugin:          make(map[string]cni.CNI),
                sandboxService:     newCriSandboxService(&config, options.SandboxControllers),
                runtimeHandlers:    make(map[string]*runtime.RuntimeHandler),
+               statsCollector:     statsCollector,
        }
 
        // TODO: Make discard time configurable
@@ -265,6 +271,13 @@ func (c *criService) Run(ready func()) error {
        // then you have to manually filter namespace foo
        c.eventMonitor.Subscribe(c.client, []string{`topic=="/tasks/oom"`, `topic~="/images/"`})
 
+       // Start the background stats collector for UsageNanoCores calculation
+       log.L.Info("Start stats collector")
+       if c.statsCollector != nil {
+               c.statsCollector.SetService(c)
+               c.statsCollector.Start()
+       }
+
        log.L.Infof("Start recovering state")
        if err := c.recover(ctrdutil.NamespacedContext()); err != nil {
                return fmt.Errorf("failed to recover state: %w", err)
@@ -359,6 +372,9 @@ func (c *criService) Close() error {
                }
        }
        c.eventMonitor.Stop()
+       if c.statsCollector != nil {
+               c.statsCollector.Stop()
+       }
        if err := c.streamServer.Stop(); err != nil {
                return fmt.Errorf("failed to stop stream server: %w", err)
        }
index b6d9ecfb301b7e9bf70cc48101ef233dd99815ab..5eaa7abff5d3dcbe05941ee0f7292e81e7d559cd 100644 (file)
@@ -143,9 +143,9 @@ func newTestCRIService(opts ...testOpt) *criService {
        service := &criService{
                config:             testConfig,
                os:                 ostesting.NewFakeOS(),
-               sandboxStore:       sandboxstore.NewStore(labels),
+               sandboxStore:       sandboxstore.NewStore(labels, nil),
                sandboxNameIndex:   registrar.NewRegistrar(),
-               containerStore:     containerstore.NewStore(labels),
+               containerStore:     containerstore.NewStore(labels, nil),
                containerNameIndex: registrar.NewRegistrar(),
                netPlugin: map[string]cni.CNI{
                        defaultNetworkPlugin: servertesting.NewFakeCNIPlugin(),
diff --git a/internal/cri/server/stats_collector.go b/internal/cri/server/stats_collector.go
new file mode 100644 (file)
index 0000000..e5e92ca
--- /dev/null
@@ -0,0 +1,335 @@
+//go:build linux
+
+/*
+   Copyright The containerd Authors.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package server
+
+import (
+       "context"
+       "sync"
+       "time"
+
+       "github.com/containerd/cgroups/v3"
+       "github.com/containerd/cgroups/v3/cgroup1"
+       cg1 "github.com/containerd/cgroups/v3/cgroup1/stats"
+       cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2"
+       cg2 "github.com/containerd/cgroups/v3/cgroup2/stats"
+       "github.com/containerd/log"
+       "github.com/containerd/typeurl/v2"
+
+       "github.com/containerd/containerd/api/services/tasks/v1"
+       criconfig "github.com/containerd/containerd/v2/internal/cri/config"
+       "github.com/containerd/containerd/v2/internal/cri/store/stats"
+       ctrdutil "github.com/containerd/containerd/v2/internal/cri/util"
+)
+
+const (
+       // defaultCollectionInterval is how often the collector fetches metrics.
+       // This matches cAdvisor's default housekeeping interval.
+       defaultCollectionInterval = 1 * time.Second
+
+       // defaultStatsAge is how long stats samples are retained.
+       defaultStatsAge = 2 * time.Minute
+)
+
+// StatsCollector periodically collects CPU stats for containers and sandboxes,
+// storing them in TimedStores. This allows UsageNanoCores to be calculated
+// from historical samples, similar to how cAdvisor works.
+type StatsCollector struct {
+       mu sync.RWMutex
+       // stores maps container/sandbox ID to their TimedStore
+       stores map[string]*stats.TimedStore
+       // interval is how often to collect stats
+       interval time.Duration
+       // statsAge is how long to keep samples
+       statsAge time.Duration
+       // maxSamples is the max number of samples per container
+       maxSamples int
+       // stopCh is used to stop the collection loop
+       stopCh chan struct{}
+       // doneCh signals when the collection loop has stopped
+       doneCh chan struct{}
+       // stopOnce ensures Stop() only closes stopCh once
+       stopOnce sync.Once
+       // service provides access to task metrics
+       service *criService
+}
+
+// NewStatsCollector creates a new StatsCollector.
+// The service must be set via SetService before calling Start.
+func NewStatsCollector(config criconfig.Config) *StatsCollector {
+       interval := defaultCollectionInterval
+       statsAge := defaultStatsAge
+
+       // Use config values if provided
+       if config.StatsCollectionPeriod != "" {
+               if d, err := time.ParseDuration(config.StatsCollectionPeriod); err == nil {
+                       interval = d
+               }
+       }
+       if config.StatsRetentionPeriod != "" {
+               if d, err := time.ParseDuration(config.StatsRetentionPeriod); err == nil {
+                       statsAge = d
+               }
+       }
+
+       // Calculate maxSamples from statsAge and interval
+       maxSamples := int(statsAge / interval)
+       if maxSamples < 2 {
+               maxSamples = 2 // Need at least 2 samples to calculate rate
+       }
+
+       return &StatsCollector{
+               stores:     make(map[string]*stats.TimedStore),
+               interval:   interval,
+               statsAge:   statsAge,
+               maxSamples: maxSamples,
+               stopCh:     make(chan struct{}),
+               doneCh:     make(chan struct{}),
+       }
+}
+
+// SetService sets the criService reference. Must be called before Start.
+func (c *StatsCollector) SetService(service *criService) {
+       c.service = service
+}
+
+// Start begins the background stats collection loop.
+func (c *StatsCollector) Start() {
+       go c.collectLoop()
+}
+
+// Stop stops the background stats collection loop and waits for it to finish.
+// It is safe to call Stop multiple times.
+func (c *StatsCollector) Stop() {
+       c.stopOnce.Do(func() {
+               close(c.stopCh)
+       })
+       <-c.doneCh
+}
+
+// collectLoop periodically collects stats for all containers and sandboxes.
+func (c *StatsCollector) collectLoop() {
+       ticker := time.NewTicker(c.interval)
+       defer ticker.Stop()
+       defer close(c.doneCh)
+
+       // Do an initial collection immediately
+       c.collect()
+
+       for {
+               select {
+               case <-c.stopCh:
+                       return
+               case <-ticker.C:
+                       c.collect()
+               }
+       }
+}
+
+// collect fetches metrics for all containers and sandboxes and stores them.
+func (c *StatsCollector) collect() {
+       // Use namespaced context to access containers in the k8s.io namespace
+       ctx := ctrdutil.NamespacedContext()
+
+       // Collect container stats
+       c.collectContainerStats(ctx)
+
+       // Collect sandbox stats (Linux only, uses cgroup metrics)
+       c.collectSandboxStats(ctx)
+}
+
+// collectContainerStats fetches metrics for all containers.
+func (c *StatsCollector) collectContainerStats(ctx context.Context) {
+       containers := c.service.containerStore.List()
+       if len(containers) == 0 {
+               return
+       }
+
+       // Build request for all containers
+       req := &tasks.MetricsRequest{}
+       for _, cntr := range containers {
+               req.Filters = append(req.Filters, "id=="+cntr.ID)
+       }
+
+       resp, err := c.service.client.TaskService().Metrics(ctx, req)
+       if err != nil {
+               log.G(ctx).WithError(err).Debug("StatsCollector: failed to fetch container metrics")
+               return
+       }
+
+       timestamp := time.Now()
+       for _, metric := range resp.Metrics {
+               usageCoreNanoSeconds, ok := extractCPUUsage(metric.Data)
+               if !ok {
+                       continue
+               }
+               c.addSample(metric.ID, timestamp, usageCoreNanoSeconds)
+       }
+}
+
+// collectSandboxStats fetches metrics for all sandboxes.
+// On Linux, sandbox/pod stats come from the parent cgroup (which includes all
+// containers in the pod), not from the pause container's task metrics.
+// This matches how sandbox_stats_linux.go retrieves pod-level CPU stats.
+func (c *StatsCollector) collectSandboxStats(ctx context.Context) {
+       sandboxes := c.service.sandboxStore.List()
+       if len(sandboxes) == 0 {
+               return
+       }
+
+       timestamp := time.Now()
+       for _, sb := range sandboxes {
+               // Get the parent cgroup path for the pod
+               cgroupPath := sb.Config.GetLinux().GetCgroupParent()
+               if cgroupPath == "" {
+                       continue
+               }
+
+               usageCoreNanoSeconds, ok := c.getCgroupCPUUsage(ctx, cgroupPath)
+               if !ok {
+                       continue
+               }
+               c.addSample(sb.ID, timestamp, usageCoreNanoSeconds)
+       }
+}
+
+// getCgroupCPUUsage reads CPU usage from a cgroup path.
+// Supports both cgroupv1 and cgroupv2.
+func (c *StatsCollector) getCgroupCPUUsage(ctx context.Context, cgroupPath string) (uint64, bool) {
+       switch cgroups.Mode() {
+       case cgroups.Unified:
+               cg, err := cgroupsv2.Load(cgroupPath)
+               if err != nil {
+                       log.G(ctx).WithError(err).Debugf("StatsCollector: failed to load cgroupv2: %s", cgroupPath)
+                       return 0, false
+               }
+               stats, err := cg.Stat()
+               if err != nil {
+                       log.G(ctx).WithError(err).Debugf("StatsCollector: failed to get cgroupv2 stats: %s", cgroupPath)
+                       return 0, false
+               }
+               if stats.CPU != nil {
+                       // cgroupv2 reports in microseconds, convert to nanoseconds
+                       return stats.CPU.UsageUsec * 1000, true
+               }
+       default:
+               control, err := cgroup1.Load(cgroup1.StaticPath(cgroupPath))
+               if err != nil {
+                       log.G(ctx).WithError(err).Debugf("StatsCollector: failed to load cgroupv1: %s", cgroupPath)
+                       return 0, false
+               }
+               stats, err := control.Stat(cgroup1.IgnoreNotExist)
+               if err != nil {
+                       log.G(ctx).WithError(err).Debugf("StatsCollector: failed to get cgroupv1 stats: %s", cgroupPath)
+                       return 0, false
+               }
+               if stats.CPU != nil && stats.CPU.Usage != nil {
+                       return stats.CPU.Usage.Total, true
+               }
+       }
+       return 0, false
+}
+
+// addSample adds a CPU sample for the given container/sandbox ID.
+// If a store doesn't exist for the ID, one is created (fallback for containers
+// that existed before the collector started, e.g., after a restart).
+func (c *StatsCollector) addSample(id string, timestamp time.Time, usageCoreNanoSeconds uint64) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+
+       store, ok := c.stores[id]
+       if !ok {
+               store = stats.NewTimedStore(c.statsAge, c.maxSamples)
+               c.stores[id] = store
+       }
+       store.Add(timestamp, usageCoreNanoSeconds)
+}
+
+// GetUsageNanoCores returns the latest calculated UsageNanoCores for the given
+// container/sandbox ID. Returns 0 and false if no data is available or if
+// there aren't enough samples to calculate the rate.
+func (c *StatsCollector) GetUsageNanoCores(id string) (uint64, bool) {
+       c.mu.RLock()
+       store, ok := c.stores[id]
+       c.mu.RUnlock()
+
+       if !ok {
+               return 0, false
+       }
+
+       return store.GetLatestUsageNanoCores()
+}
+
+// GetLatestSample returns the latest CPU sample for the given container/sandbox ID.
+// Returns nil if no data is available.
+func (c *StatsCollector) GetLatestSample(id string) *stats.CPUSample {
+       c.mu.RLock()
+       store, ok := c.stores[id]
+       c.mu.RUnlock()
+
+       if !ok {
+               return nil
+       }
+
+       return store.GetLatest()
+}
+
+// AddContainer creates a stats store for a container/sandbox.
+// Should be called when a container or sandbox is added to the store.
+func (c *StatsCollector) AddContainer(id string) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if _, ok := c.stores[id]; !ok {
+               c.stores[id] = stats.NewTimedStore(c.statsAge, c.maxSamples)
+       }
+}
+
+// RemoveContainer removes the stats store for a container/sandbox.
+// Should be called when a container or sandbox is removed from the store.
+func (c *StatsCollector) RemoveContainer(id string) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       delete(c.stores, id)
+}
+
+// extractCPUUsage extracts the CPU usage in nanoseconds from the metric data.
+// Supports both cgroupv1 and cgroupv2 metrics.
+func extractCPUUsage(data typeurl.Any) (uint64, bool) {
+       if data == nil {
+               return 0, false
+       }
+
+       taskMetrics, err := typeurl.UnmarshalAny(data)
+       if err != nil {
+               return 0, false
+       }
+
+       switch v := taskMetrics.(type) {
+       case *cg1.Metrics:
+               if v.CPU != nil && v.CPU.Usage != nil {
+                       return v.CPU.Usage.Total, true
+               }
+       case *cg2.Metrics:
+               if v.CPU != nil {
+                       // cgroupv2 reports in microseconds, convert to nanoseconds
+                       return v.CPU.UsageUsec * 1000, true
+               }
+       }
+
+       return 0, false
+}
diff --git a/internal/cri/server/stats_collector_other.go b/internal/cri/server/stats_collector_other.go
new file mode 100644 (file)
index 0000000..d7d0087
--- /dev/null
@@ -0,0 +1,61 @@
+//go:build !linux
+
+/*
+   Copyright The containerd Authors.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package server
+
+import (
+       criconfig "github.com/containerd/containerd/v2/internal/cri/config"
+       "github.com/containerd/containerd/v2/internal/cri/store/stats"
+)
+
+// StatsCollector is a stub for non-Linux platforms.
+// On Linux, the real implementation periodically collects CPU stats for
+// containers and sandboxes, storing them in TimedStores.
+type StatsCollector struct{}
+
+// NewStatsCollector creates a new StatsCollector (stub on non-Linux).
+func NewStatsCollector(config criconfig.Config) *StatsCollector {
+       return &StatsCollector{}
+}
+
+// SetService sets the criService reference (no-op on non-Linux).
+func (c *StatsCollector) SetService(service *criService) {}
+
+// Start begins the background stats collection loop (no-op on non-Linux).
+func (c *StatsCollector) Start() {}
+
+// Stop stops the background stats collection loop (no-op on non-Linux).
+func (c *StatsCollector) Stop() {}
+
+// GetUsageNanoCores returns the latest calculated UsageNanoCores for the given
+// container/sandbox ID. Returns 0 and false on non-Linux platforms.
+func (c *StatsCollector) GetUsageNanoCores(id string) (uint64, bool) {
+       return 0, false
+}
+
+// GetLatestSample returns the latest CPU sample for the given container/sandbox ID.
+// Returns nil on non-Linux platforms.
+func (c *StatsCollector) GetLatestSample(id string) *stats.CPUSample {
+       return nil
+}
+
+// AddContainer creates a stats store for a container/sandbox (no-op on non-Linux).
+func (c *StatsCollector) AddContainer(id string) {}
+
+// RemoveContainer removes the stats store for a container/sandbox (no-op on non-Linux).
+func (c *StatsCollector) RemoveContainer(id string) {}
index 6054a157629af2a205791dc89788ab703f91db0a..4515f81c1000a7fea99c864545f551b3bd9ef02a 100644 (file)
@@ -107,18 +107,20 @@ func (c *Container) Delete() error {
 
 // Store stores all Containers.
 type Store struct {
-       lock       sync.RWMutex
-       containers map[string]Container
-       idIndex    *truncindex.TruncIndex
-       labels     *label.Store
+       lock           sync.RWMutex
+       containers     map[string]Container
+       idIndex        *truncindex.TruncIndex
+       labels         *label.Store
+       statsCollector store.StatsCollector
 }
 
 // NewStore creates a container store.
-func NewStore(labels *label.Store) *Store {
+func NewStore(labels *label.Store, statsCollector store.StatsCollector) *Store {
        return &Store{
-               containers: make(map[string]Container),
-               idIndex:    truncindex.NewTruncIndex([]string{}),
-               labels:     labels,
+               containers:     make(map[string]Container),
+               idIndex:        truncindex.NewTruncIndex([]string{}),
+               labels:         labels,
+               statsCollector: statsCollector,
        }
 }
 
@@ -137,6 +139,9 @@ func (s *Store) Add(c Container) error {
                return err
        }
        s.containers[c.ID] = c
+       if s.statsCollector != nil {
+               s.statsCollector.AddContainer(c.ID)
+       }
        return nil
 }
 
@@ -210,4 +215,7 @@ func (s *Store) Delete(id string) {
        s.labels.Release(c.ProcessLabel)
        s.idIndex.Delete(id)
        delete(s.containers, id)
+       if s.statsCollector != nil {
+               s.statsCollector.RemoveContainer(id)
+       }
 }
index ee41422c90b442154a0ddf848b9468cab8dac114..a3204e56b77969475a3f943e93b867e2ecea049e 100644 (file)
@@ -163,7 +163,7 @@ func TestContainerStore(t *testing.T) {
                containers[id] = container
        }
 
-       s := NewStore(label.NewStore())
+       s := NewStore(label.NewStore(), nil)
        reserved := map[string]bool{}
        s.labels.Reserver = func(label string) {
                reserved[strings.SplitN(label, ":", 4)[3]] = true
index a26e94f02aa5933ee9e7fb5dbaa614ed5fa4edb5..65465d617d4bc435ab3af578ddb18b10bf0dfdae 100644 (file)
@@ -76,18 +76,20 @@ func NewSandbox(metadata Metadata, status Status) Sandbox {
 
 // Store stores all sandboxes.
 type Store struct {
-       lock      sync.RWMutex
-       sandboxes map[string]Sandbox
-       idIndex   *truncindex.TruncIndex
-       labels    *label.Store
+       lock           sync.RWMutex
+       sandboxes      map[string]Sandbox
+       idIndex        *truncindex.TruncIndex
+       labels         *label.Store
+       statsCollector store.StatsCollector
 }
 
 // NewStore creates a sandbox store.
-func NewStore(labels *label.Store) *Store {
+func NewStore(labels *label.Store, statsCollector store.StatsCollector) *Store {
        return &Store{
-               sandboxes: make(map[string]Sandbox),
-               idIndex:   truncindex.NewTruncIndex([]string{}),
-               labels:    labels,
+               sandboxes:      make(map[string]Sandbox),
+               idIndex:        truncindex.NewTruncIndex([]string{}),
+               labels:         labels,
+               statsCollector: statsCollector,
        }
 }
 
@@ -106,6 +108,9 @@ func (s *Store) Add(sb Sandbox) error {
                return err
        }
        s.sandboxes[sb.ID] = sb
+       if s.statsCollector != nil {
+               s.statsCollector.AddContainer(sb.ID)
+       }
        return nil
 }
 
@@ -175,4 +180,7 @@ func (s *Store) Delete(id string) {
        s.labels.Release(s.sandboxes[id].ProcessLabel)
        s.idIndex.Delete(id)
        delete(s.sandboxes, id)
+       if s.statsCollector != nil {
+               s.statsCollector.RemoveContainer(id)
+       }
 }
index a79df9092b047f37ad046cd2c19d5eae515cfb91..34e2d2388cb89dc7fbd9abba9e091a6cdc35fe95 100644 (file)
@@ -130,7 +130,7 @@ func TestSandboxStore(t *testing.T) {
                },
        }
        assert := assertlib.New(t)
-       s := NewStore(label.NewStore())
+       s := NewStore(label.NewStore(), nil)
 
        t.Logf("should be able to add sandbox")
        for _, sb := range sandboxes {
diff --git a/internal/cri/store/stats/timed_store.go b/internal/cri/store/stats/timed_store.go
new file mode 100644 (file)
index 0000000..fc2d502
--- /dev/null
@@ -0,0 +1,182 @@
+/*
+   Copyright The containerd Authors.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package stats
+
+import (
+       "sort"
+       "sync"
+       "time"
+)
+
+// CPUSample holds a single CPU usage sample with timestamp.
+type CPUSample struct {
+       Timestamp            time.Time
+       UsageCoreNanoSeconds uint64
+       // UsageNanoCores is the instantaneous CPU usage rate calculated
+       // from this sample and the previous sample.
+       UsageNanoCores uint64
+}
+
+// timedStoreDataSlice is a time-ordered slice of CPU samples.
+type timedStoreDataSlice []CPUSample
+
+func (t timedStoreDataSlice) Less(i, j int) bool {
+       return t[i].Timestamp.Before(t[j].Timestamp)
+}
+
+func (t timedStoreDataSlice) Len() int {
+       return len(t)
+}
+
+func (t timedStoreDataSlice) Swap(i, j int) {
+       t[i], t[j] = t[j], t[i]
+}
+
+// TimedStore is a time-based buffer for CPU stats samples.
+// It stores samples for a specific time period and/or a max number of items,
+// similar to cAdvisor's TimedStore but specialized for CPU stats.
+// All methods are thread-safe.
+type TimedStore struct {
+       mu       sync.RWMutex
+       buffer   timedStoreDataSlice
+       age      time.Duration
+       maxItems int
+}
+
+// NewTimedStore creates a new TimedStore.
+// age specifies how long samples are retained.
+// maxItems specifies the maximum number of samples to keep (-1 for no limit).
+func NewTimedStore(age time.Duration, maxItems int) *TimedStore {
+       capacity := maxItems
+       if capacity < 0 {
+               capacity = 0 // Will grow dynamically
+       }
+       return &TimedStore{
+               buffer:   make(timedStoreDataSlice, 0, capacity),
+               age:      age,
+               maxItems: maxItems,
+       }
+}
+
+// Add adds a new CPU sample to the store.
+// It calculates UsageNanoCores from the previous sample if available.
+// Samples are stored in timestamp order, and old samples are evicted
+// based on age and maxItems constraints.
+func (s *TimedStore) Add(timestamp time.Time, usageCoreNanoSeconds uint64) {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+
+       sample := CPUSample{
+               Timestamp:            timestamp,
+               UsageCoreNanoSeconds: usageCoreNanoSeconds,
+       }
+
+       // Calculate UsageNanoCores from the previous sample if we have one
+       if len(s.buffer) > 0 {
+               lastSample := s.buffer[len(s.buffer)-1]
+               sample.UsageNanoCores = calculateUsageNanoCores(lastSample, sample)
+       }
+
+       // Common case: data is added in order
+       if len(s.buffer) == 0 || !timestamp.Before(s.buffer[len(s.buffer)-1].Timestamp) {
+               s.buffer = append(s.buffer, sample)
+       } else {
+               // Data is out of order; insert it in the correct position
+               index := sort.Search(len(s.buffer), func(index int) bool {
+                       return s.buffer[index].Timestamp.After(timestamp)
+               })
+               s.buffer = append(s.buffer, CPUSample{}) // Make room
+               copy(s.buffer[index+1:], s.buffer[index:])
+               s.buffer[index] = sample
+               // Recalculate UsageNanoCores for this and the next sample
+               if index > 0 {
+                       s.buffer[index].UsageNanoCores = calculateUsageNanoCores(s.buffer[index-1], s.buffer[index])
+               }
+               if index+1 < len(s.buffer) {
+                       s.buffer[index+1].UsageNanoCores = calculateUsageNanoCores(s.buffer[index], s.buffer[index+1])
+               }
+       }
+
+       // Remove any elements before eviction time
+       evictTime := timestamp.Add(-s.age)
+       index := sort.Search(len(s.buffer), func(index int) bool {
+               return s.buffer[index].Timestamp.After(evictTime)
+       })
+       if index < len(s.buffer) && index > 0 {
+               s.buffer = s.buffer[index:]
+       }
+
+       // Remove any elements if over max size
+       if s.maxItems >= 0 && len(s.buffer) > s.maxItems {
+               startIndex := len(s.buffer) - s.maxItems
+               s.buffer = s.buffer[startIndex:]
+       }
+}
+
+// GetLatest returns the most recent sample, or nil if no samples exist.
+func (s *TimedStore) GetLatest() *CPUSample {
+       s.mu.RLock()
+       defer s.mu.RUnlock()
+
+       if len(s.buffer) == 0 {
+               return nil
+       }
+       sample := s.buffer[len(s.buffer)-1]
+       return &sample
+}
+
+// GetLatestUsageNanoCores returns the most recent UsageNanoCores value.
+// Returns 0 and false if no samples with calculated UsageNanoCores exist.
+func (s *TimedStore) GetLatestUsageNanoCores() (uint64, bool) {
+       s.mu.RLock()
+       defer s.mu.RUnlock()
+
+       // We need at least 2 samples to have a calculated UsageNanoCores
+       if len(s.buffer) < 2 {
+               return 0, false
+       }
+
+       sample := s.buffer[len(s.buffer)-1]
+       return sample.UsageNanoCores, true
+}
+
+// Size returns the number of samples in the store.
+func (s *TimedStore) Size() int {
+       s.mu.RLock()
+       defer s.mu.RUnlock()
+       return len(s.buffer)
+}
+
+// calculateUsageNanoCores calculates the instantaneous CPU usage rate
+// (nanocores) from two consecutive samples.
+// Returns 0 if the calculation cannot be performed (e.g., invalid time delta).
+func calculateUsageNanoCores(prev, cur CPUSample) uint64 {
+       nanoSeconds := cur.Timestamp.UnixNano() - prev.Timestamp.UnixNano()
+
+       // Zero or negative interval
+       if nanoSeconds <= 0 {
+               return 0
+       }
+
+       // CPU usage can't go backwards (might happen if container was restarted)
+       if cur.UsageCoreNanoSeconds < prev.UsageCoreNanoSeconds {
+               return 0
+       }
+
+       usageDelta := cur.UsageCoreNanoSeconds - prev.UsageCoreNanoSeconds
+       return uint64(float64(usageDelta) / float64(nanoSeconds) * float64(time.Second/time.Nanosecond))
+}
diff --git a/internal/cri/store/stats/timed_store_test.go b/internal/cri/store/stats/timed_store_test.go
new file mode 100644 (file)
index 0000000..37350e9
--- /dev/null
@@ -0,0 +1,307 @@
+/*
+   Copyright The containerd Authors.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package stats
+
+import (
+       "testing"
+       "time"
+)
+
+func TestTimedStoreBasic(t *testing.T) {
+       store := NewTimedStore(time.Minute, 10)
+
+       // Initially empty
+       if store.Size() != 0 {
+               t.Errorf("expected size 0, got %d", store.Size())
+       }
+
+       // GetLatest should return nil when empty
+       if sample := store.GetLatest(); sample != nil {
+               t.Errorf("expected nil sample, got %v", sample)
+       }
+
+       // GetLatestUsageNanoCores should return false when empty
+       if _, ok := store.GetLatestUsageNanoCores(); ok {
+               t.Error("expected false for empty store")
+       }
+}
+
+func TestTimedStoreAddAndGet(t *testing.T) {
+       store := NewTimedStore(time.Minute, 10)
+
+       now := time.Now()
+
+       // Add first sample
+       store.Add(now, 1000000000) // 1 second of CPU time in nanoseconds
+
+       if store.Size() != 1 {
+               t.Errorf("expected size 1, got %d", store.Size())
+       }
+
+       sample := store.GetLatest()
+       if sample == nil {
+               t.Fatal("expected non-nil sample")
+       }
+       if sample.UsageCoreNanoSeconds != 1000000000 {
+               t.Errorf("expected UsageCoreNanoSeconds 1000000000, got %d", sample.UsageCoreNanoSeconds)
+       }
+       // First sample should have 0 UsageNanoCores (no previous sample to calculate from)
+       if sample.UsageNanoCores != 0 {
+               t.Errorf("expected UsageNanoCores 0 for first sample, got %d", sample.UsageNanoCores)
+       }
+
+       // GetLatestUsageNanoCores should still return false (need at least 2 samples)
+       if _, ok := store.GetLatestUsageNanoCores(); ok {
+               t.Error("expected false with only 1 sample")
+       }
+}
+
+func TestTimedStoreUsageNanoCoresCalculation(t *testing.T) {
+       store := NewTimedStore(time.Minute, 10)
+
+       now := time.Now()
+
+       // Add first sample: 1 second of CPU time
+       store.Add(now, 1000000000)
+
+       // Add second sample after 1 second: 2 seconds of CPU time
+       // This means the CPU was running at 100% (1 core) during that second
+       store.Add(now.Add(time.Second), 2000000000)
+
+       if store.Size() != 2 {
+               t.Errorf("expected size 2, got %d", store.Size())
+       }
+
+       // Now we should be able to get UsageNanoCores
+       usageNanoCores, ok := store.GetLatestUsageNanoCores()
+       if !ok {
+               t.Fatal("expected true with 2 samples")
+       }
+
+       // Expected: (2000000000 - 1000000000) / 1000000000 * 1000000000 = 1000000000 nanocores (1 core)
+       expected := uint64(1000000000)
+       if usageNanoCores != expected {
+               t.Errorf("expected UsageNanoCores %d, got %d", expected, usageNanoCores)
+       }
+}
+
+func TestTimedStoreHalfCoreUsage(t *testing.T) {
+       store := NewTimedStore(time.Minute, 10)
+
+       now := time.Now()
+
+       // Add first sample
+       store.Add(now, 0)
+
+       // Add second sample after 2 seconds: 1 second of CPU time
+       // This means the CPU was running at 50% (0.5 cores) during those 2 seconds
+       store.Add(now.Add(2*time.Second), 1000000000)
+
+       usageNanoCores, ok := store.GetLatestUsageNanoCores()
+       if !ok {
+               t.Fatal("expected true with 2 samples")
+       }
+
+       // Expected: 1000000000 / 2000000000 * 1000000000 = 500000000 nanocores (0.5 cores)
+       expected := uint64(500000000)
+       if usageNanoCores != expected {
+               t.Errorf("expected UsageNanoCores %d, got %d", expected, usageNanoCores)
+       }
+}
+
+func TestTimedStoreMaxItems(t *testing.T) {
+       store := NewTimedStore(time.Hour, 3) // Keep at most 3 samples
+
+       now := time.Now()
+
+       // Add 5 samples
+       for i := 0; i < 5; i++ {
+               store.Add(now.Add(time.Duration(i)*time.Second), uint64(i)*1000000000)
+       }
+
+       // Should only have 3 samples
+       if store.Size() != 3 {
+               t.Errorf("expected size 3, got %d", store.Size())
+       }
+
+       // Latest sample should be the 5th one (index 4)
+       sample := store.GetLatest()
+       if sample == nil {
+               t.Fatal("expected non-nil sample")
+       }
+       if sample.UsageCoreNanoSeconds != 4000000000 {
+               t.Errorf("expected UsageCoreNanoSeconds 4000000000, got %d", sample.UsageCoreNanoSeconds)
+       }
+}
+
+func TestTimedStoreAgeEviction(t *testing.T) {
+       store := NewTimedStore(3*time.Second, -1) // Keep samples for 3 seconds, no item limit
+
+       now := time.Now()
+
+       // Add sample at t=0
+       store.Add(now, 0)
+
+       // Add sample at t=1s
+       store.Add(now.Add(1*time.Second), 1000000000)
+
+       // Add sample at t=2s
+       store.Add(now.Add(2*time.Second), 2000000000)
+
+       if store.Size() != 3 {
+               t.Errorf("expected size 3, got %d", store.Size())
+       }
+
+       // Add sample at t=4s - this should evict t=0 sample only
+       // evictTime = t=4s - 3s = t=1s, so samples strictly after t=1s are kept (t=2s, t=4s)
+       store.Add(now.Add(4*time.Second), 4000000000)
+
+       // Should have t=2s and t=4s samples (samples strictly after evictTime t=1s)
+       if store.Size() != 2 {
+               t.Errorf("expected size 2 after eviction, got %d", store.Size())
+       }
+
+       // Verify latest sample is the one we just added
+       sample := store.GetLatest()
+       if sample == nil {
+               t.Fatal("expected non-nil sample")
+       }
+       if sample.UsageCoreNanoSeconds != 4000000000 {
+               t.Errorf("expected UsageCoreNanoSeconds 4000000000, got %d", sample.UsageCoreNanoSeconds)
+       }
+}
+
+func TestTimedStoreConcurrentAccess(t *testing.T) {
+       store := NewTimedStore(time.Minute, 100)
+
+       now := time.Now()
+       done := make(chan bool)
+
+       // Start multiple goroutines adding samples
+       for i := 0; i < 10; i++ {
+               go func(id int) {
+                       for j := 0; j < 100; j++ {
+                               store.Add(now.Add(time.Duration(id*100+j)*time.Millisecond), uint64(id*100+j)*1000000)
+                       }
+                       done <- true
+               }(i)
+       }
+
+       // Start multiple goroutines reading
+       for i := 0; i < 10; i++ {
+               go func() {
+                       for j := 0; j < 100; j++ {
+                               store.GetLatest()
+                               store.GetLatestUsageNanoCores()
+                               store.Size()
+                       }
+                       done <- true
+               }()
+       }
+
+       // Wait for all goroutines
+       for i := 0; i < 20; i++ {
+               <-done
+       }
+
+       // Just verify no panic occurred and size is reasonable
+       if store.Size() > 100 {
+               t.Errorf("expected size <= 100, got %d", store.Size())
+       }
+}
+
+func TestCalculateUsageNanoCores(t *testing.T) {
+       // Use a fixed base time to ensure consistent time deltas in tests
+       now := time.Now()
+
+       tests := []struct {
+               name     string
+               prev     CPUSample
+               cur      CPUSample
+               expected uint64
+       }{
+               {
+                       name: "normal case - 1 core",
+                       prev: CPUSample{
+                               Timestamp:            now,
+                               UsageCoreNanoSeconds: 0,
+                       },
+                       cur: CPUSample{
+                               Timestamp:            now.Add(time.Second),
+                               UsageCoreNanoSeconds: 1000000000,
+                       },
+                       expected: 1000000000, // 1 core
+               },
+               {
+                       name: "normal case - 2 cores",
+                       prev: CPUSample{
+                               Timestamp:            now,
+                               UsageCoreNanoSeconds: 0,
+                       },
+                       cur: CPUSample{
+                               Timestamp:            now.Add(time.Second),
+                               UsageCoreNanoSeconds: 2000000000,
+                       },
+                       expected: 2000000000, // 2 cores
+               },
+               {
+                       name: "zero time delta",
+                       prev: CPUSample{
+                               Timestamp:            now,
+                               UsageCoreNanoSeconds: 0,
+                       },
+                       cur: CPUSample{
+                               Timestamp:            now, // same timestamp
+                               UsageCoreNanoSeconds: 1000000000,
+                       },
+                       expected: 0,
+               },
+               {
+                       name: "negative time delta",
+                       prev: CPUSample{
+                               Timestamp:            now.Add(time.Second),
+                               UsageCoreNanoSeconds: 0,
+                       },
+                       cur: CPUSample{
+                               Timestamp:            now, // earlier than prev
+                               UsageCoreNanoSeconds: 1000000000,
+                       },
+                       expected: 0,
+               },
+               {
+                       name: "CPU usage decreased (container restart)",
+                       prev: CPUSample{
+                               Timestamp:            now,
+                               UsageCoreNanoSeconds: 2000000000,
+                       },
+                       cur: CPUSample{
+                               Timestamp:            now.Add(time.Second),
+                               UsageCoreNanoSeconds: 1000000000, // less than prev
+                       },
+                       expected: 0,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := calculateUsageNanoCores(tt.prev, tt.cur)
+                       if result != tt.expected {
+                               t.Errorf("calculateUsageNanoCores() = %d, want %d", result, tt.expected)
+                       }
+               })
+       }
+}
index 73626b1af873fa5e7e75a1e1949a83d86fd52714..8f9ee0f7c72bb1aa0d7689c3ccb21ce9f6492b3f 100644 (file)
@@ -18,6 +18,15 @@ package store
 
 import "sync"
 
+// StatsCollector is an interface for managing container/sandbox stats.
+// The stats collector tracks CPU usage samples for calculating UsageNanoCores.
+type StatsCollector interface {
+       // AddContainer creates a stats store for the container/sandbox.
+       AddContainer(id string)
+       // RemoveContainer removes the stats store for the container/sandbox.
+       RemoveContainer(id string)
+}
+
 // StopCh is used to propagate the stop information of a container.
 type StopCh struct {
        ch   chan struct{}