adds a background stats collector that calculates `UsageNanoCores` for containers and pod sandboxes.
- run in the background every second to collect CPU metrics for all containers and sandboxes (similar to what cAdvisor does)
- keep a rolling buffer of CPU samples and calculates the instantaneous CPU usage rate from consecutive samples
- read pod-level CPU stats from the parent cgroup rather than the pause container
- add cgroupv2 Pressure Stall Information for CPU, memory, and IO
- add missing `Timestamp` and `Interfaces` fields
when Kubernetes runs with `PodAndContainerStatsFromCRI=true`, it expects `UsageNanoCores` to be set in stats responses.
This value represents how much CPU is being used right now (as opposed to `UsageCoreNanoSeconds` which is cumulative).
To calculate it, we need to compare CPU samples over time to replicate what is in cadvisor.
we can't yet really test this in CI as some changes in kubernetes has to land for `--feature-gates=PodAndContainerStatsFromCRI=true`
Signed-off-by: Davanum Srinivas <davanum@gmail.com>
runs-on: ubuntu-24.04
timeout-minutes: 90
steps:
+ - name: Clean up disk space
+ run: |
+ df -h
+ sudo rm -rf /usr/share/dotnet \
+ /usr/local/graalvm \
+ /usr/local/.ghcup \
+ /usr/local/share/powershell \
+ /usr/local/share/chromium \
+ /usr/local/share/firefox \
+ /usr/local/lib/android \
+ /usr/local/lib/node_modules \
+ /usr/local/share/podman \
+ /usr/local/aws-cli/ \
+ /usr/local/lib/heroku \
+ /usr/local/rustup \
+ /usr/local/cargo \
+ /usr/local/google-cloud-sdk
+ df -h
+
- name: Checkout containerd
uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0
with:
with:
repository: kubernetes/kubernetes
path: src/k8s.io/kubernetes
+ fetch-depth: 0
- name: Install Go
uses: ./src/github.com/containerd/containerd/.github/actions/install-go
cdi_spec_dirs = ['/etc/cdi', '/var/run/cdi']
drain_exec_sync_io_timeout = '0s'
ignore_deprecation_warnings = []
+ stats_collection_period = '1s'
+ stats_retention_period = '2m'
[plugins.'io.containerd.cri.v1.runtime'.containerd]
default_runtime_name = 'runc'
// IgnoreDeprecationWarnings is the list of the deprecation IDs (such as "io.containerd.deprecation/pull-schema-1-image")
// that should be ignored for checking "ContainerdHasNoDeprecationWarnings" condition.
IgnoreDeprecationWarnings []string `toml:"ignore_deprecation_warnings" json:"ignoreDeprecationWarnings"`
+
+ // StatsCollectionPeriod is the period for collecting container/sandbox CPU stats
+ // used for calculating UsageNanoCores. This matches cAdvisor's default housekeeping interval.
+ // The string is in the golang duration format, see:
+ // https://golang.org/pkg/time/#ParseDuration
+ // Default: "1s"
+ StatsCollectionPeriod string `toml:"stats_collection_period" json:"statsCollectionPeriod"`
+
+ // StatsRetentionPeriod is how long to retain CPU stats samples for calculating UsageNanoCores.
+ // The string is in the golang duration format, see:
+ // https://golang.org/pkg/time/#ParseDuration
+ // Default: "2m"
+ StatsRetentionPeriod string `toml:"stats_retention_period" json:"statsRetentionPeriod"`
}
// X509KeyPairStreaming contains the x509 configuration for streaming
return warnings, fmt.Errorf("invalid `drain_exec_sync_io_timeout`: %w", err)
}
}
+ // Validation for stats_collection_period
+ if c.StatsCollectionPeriod != "" {
+ if _, err := time.ParseDuration(c.StatsCollectionPeriod); err != nil {
+ return warnings, fmt.Errorf("invalid `stats_collection_period`: %w", err)
+ }
+ }
+ // Validation for stats_retention_period
+ if c.StatsRetentionPeriod != "" {
+ if _, err := time.ParseDuration(c.StatsRetentionPeriod); err != nil {
+ return warnings, fmt.Errorf("invalid `stats_retention_period`: %w", err)
+ }
+ }
if err := ValidateEnableUnprivileged(ctx, c); err != nil {
return warnings, err
}
}
if cs.stats.Cpu != nil && cs.stats.Cpu.UsageCoreNanoSeconds != nil {
- // this is a calculated value and should be computed for all OSes
+ // UsageNanoCores is a calculated value and should be computed for all OSes
nanoUsage, err := c.getUsageNanoCores(cntr.Metadata.ID, false, cs.stats.Cpu.UsageCoreNanoSeconds.Value, time.Unix(0, cs.stats.Cpu.Timestamp))
if err != nil {
// If an error occurred when getting nano cores usage, skip the container
- log.G(ctx).Warnf("skipping container %q, failed to get metrics handler: %v", cntr.ID, err.Error())
+ log.G(ctx).WithError(err).Warnf("failed to get usage nano cores for container %q", cntr.ID)
continue
}
cs.stats.Cpu.UsageNanoCores = &runtime.UInt64Value{Value: nanoUsage}
}
func (c *criService) getUsageNanoCores(containerID string, isSandbox bool, currentUsageCoreNanoSeconds uint64, currentTimestamp time.Time) (uint64, error) {
+ // First, try to get pre-calculated UsageNanoCores from the background stats collector.
+ // This ensures we have valid data even on the first query (as the collector runs
+ // continuously in the background, similar to cAdvisor's housekeeping).
+ if c.statsCollector != nil {
+ if usageNanoCores, ok := c.statsCollector.GetUsageNanoCores(containerID); ok {
+ return usageNanoCores, nil
+ }
+ }
+
+ // Fall back to the original implementation if the collector doesn't have data.
+ // This can happen for newly created containers that haven't been collected yet.
var oldStats *stats.ContainerStats
if isSandbox {
return containerStats{}, fmt.Errorf("failed to obtain memory stats: %w", err)
}
cs.Memory = memoryStats
- if err != nil {
- return containerStats{}, fmt.Errorf("failed to obtain pid count: %w", err)
- }
+
+ // IO stats (only has PSI for cgroupv2)
+ cs.Io = c.ioContainerStats(metrics, protobuf.FromTimestamp(stats.Timestamp))
return containerStats{&cs, pids}, nil
}
return 0
}
+// convertCg2PSIToCRI converts cgroupv2 PSIStats to CRI PsiStats.
+// cgroupv2 PSI Total is in microseconds, CRI expects nanoseconds.
+func convertCg2PSIToCRI(psi *cg2.PSIStats) *runtime.PsiStats {
+ if psi == nil {
+ return nil
+ }
+
+ result := &runtime.PsiStats{}
+
+ if psi.Full != nil {
+ result.Full = &runtime.PsiData{
+ Total: psi.Full.Total * 1000, // convert microseconds to nanoseconds
+ Avg10: psi.Full.Avg10,
+ Avg60: psi.Full.Avg60,
+ Avg300: psi.Full.Avg300,
+ }
+ }
+
+ if psi.Some != nil {
+ result.Some = &runtime.PsiData{
+ Total: psi.Some.Total * 1000, // convert microseconds to nanoseconds
+ Avg10: psi.Some.Avg10,
+ Avg60: psi.Some.Avg60,
+ Avg300: psi.Some.Avg300,
+ }
+ }
+
+ return result
+}
+
func (c *criService) cpuContainerStats(ID string, isSandbox bool, stats cgroupMetrics, timestamp time.Time) (*runtime.CpuUsage, error) {
switch {
case stats.v1 != nil:
if metrics.CPU != nil {
// convert to nano seconds
usageCoreNanoSeconds := metrics.CPU.UsageUsec * 1000
-
return &runtime.CpuUsage{
Timestamp: timestamp.UnixNano(),
UsageCoreNanoSeconds: &runtime.UInt64Value{Value: usageCoreNanoSeconds},
+ Psi: convertCg2PSIToCRI(metrics.CPU.PSI),
}, nil
}
}
RssBytes: &runtime.UInt64Value{Value: metrics.Memory.Anon},
PageFaults: &runtime.UInt64Value{Value: metrics.Memory.Pgfault},
MajorPageFaults: &runtime.UInt64Value{Value: metrics.Memory.Pgmajfault},
+ Psi: convertCg2PSIToCRI(metrics.Memory.PSI),
}, nil
}
}
return nil, nil
}
+
+func (c *criService) ioContainerStats(stats cgroupMetrics, timestamp time.Time) *runtime.IoUsage {
+ switch {
+ case stats.v1 != nil:
+ // cgroupv1 doesn't have IO PSI stats
+ return nil
+ case stats.v2 != nil:
+ metrics := stats.v2
+ if metrics.Io != nil && metrics.Io.PSI != nil {
+ return &runtime.IoUsage{
+ Timestamp: timestamp.UnixNano(),
+ Psi: convertCg2PSIToCRI(metrics.Io.PSI),
+ }
+ }
+ }
+ return nil
+}
if err != nil {
return nil, fmt.Errorf("failed to obtain cpu stats: %w", err)
}
+ if cpuStats != nil && cpuStats.UsageCoreNanoSeconds != nil {
+ nanoUsage, err := c.getUsageNanoCores(meta.ID, true /* isSandbox */, cpuStats.UsageCoreNanoSeconds.Value, timestamp)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get usage nano cores: %w", err)
+ }
+ cpuStats.UsageNanoCores = &runtime.UInt64Value{Value: nanoUsage}
+ }
podSandboxStats.Linux.Cpu = cpuStats
memoryStats, err := c.memoryContainerStats(meta.ID, *stats, timestamp)
if err != nil {
return nil, fmt.Errorf("failed to obtain network stats: %w", err)
}
+ defaultInterface := &runtime.NetworkInterfaceUsage{
+ Name: defaultIfName,
+ RxBytes: &runtime.UInt64Value{Value: linkStats.RxBytes},
+ RxErrors: &runtime.UInt64Value{Value: linkStats.RxErrors},
+ TxBytes: &runtime.UInt64Value{Value: linkStats.TxBytes},
+ TxErrors: &runtime.UInt64Value{Value: linkStats.TxErrors},
+ }
podSandboxStats.Linux.Network = &runtime.NetworkUsage{
- DefaultInterface: &runtime.NetworkInterfaceUsage{
- Name: defaultIfName,
- RxBytes: &runtime.UInt64Value{Value: linkStats.RxBytes},
- RxErrors: &runtime.UInt64Value{Value: linkStats.RxErrors},
- TxBytes: &runtime.UInt64Value{Value: linkStats.TxBytes},
- TxErrors: &runtime.UInt64Value{Value: linkStats.TxErrors},
- },
+ Timestamp: timestamp.UnixNano(),
+ DefaultInterface: defaultInterface,
+ Interfaces: []*runtime.NetworkInterfaceUsage{defaultInterface},
}
}
runtimeHandlers map[string]*runtime.RuntimeHandler
// runtimeFeatures container runtime features info
runtimeFeatures *runtime.RuntimeFeatures
+ // statsCollector collects CPU stats in background for UsageNanoCores calculation
+ statsCollector *StatsCollector
}
type CRIServiceOptions struct {
labels := label.NewStore()
config := options.RuntimeService.Config()
+ // Create the stats collector first so it can be passed to the stores
+ statsCollector := NewStatsCollector(config)
+
c := &criService{
RuntimeService: options.RuntimeService,
ImageService: options.ImageService,
client: options.Client,
imageFSPaths: options.ImageService.ImageFSPaths(),
os: osinterface.RealOS{},
- sandboxStore: sandboxstore.NewStore(labels),
- containerStore: containerstore.NewStore(labels),
+ sandboxStore: sandboxstore.NewStore(labels, statsCollector),
+ containerStore: containerstore.NewStore(labels, statsCollector),
sandboxNameIndex: registrar.NewRegistrar(),
containerNameIndex: registrar.NewRegistrar(),
netPlugin: make(map[string]cni.CNI),
sandboxService: newCriSandboxService(&config, options.SandboxControllers),
runtimeHandlers: make(map[string]*runtime.RuntimeHandler),
+ statsCollector: statsCollector,
}
// TODO: Make discard time configurable
// then you have to manually filter namespace foo
c.eventMonitor.Subscribe(c.client, []string{`topic=="/tasks/oom"`, `topic~="/images/"`})
+ // Start the background stats collector for UsageNanoCores calculation
+ log.L.Info("Start stats collector")
+ if c.statsCollector != nil {
+ c.statsCollector.SetService(c)
+ c.statsCollector.Start()
+ }
+
log.L.Infof("Start recovering state")
if err := c.recover(ctrdutil.NamespacedContext()); err != nil {
return fmt.Errorf("failed to recover state: %w", err)
}
}
c.eventMonitor.Stop()
+ if c.statsCollector != nil {
+ c.statsCollector.Stop()
+ }
if err := c.streamServer.Stop(); err != nil {
return fmt.Errorf("failed to stop stream server: %w", err)
}
service := &criService{
config: testConfig,
os: ostesting.NewFakeOS(),
- sandboxStore: sandboxstore.NewStore(labels),
+ sandboxStore: sandboxstore.NewStore(labels, nil),
sandboxNameIndex: registrar.NewRegistrar(),
- containerStore: containerstore.NewStore(labels),
+ containerStore: containerstore.NewStore(labels, nil),
containerNameIndex: registrar.NewRegistrar(),
netPlugin: map[string]cni.CNI{
defaultNetworkPlugin: servertesting.NewFakeCNIPlugin(),
--- /dev/null
+//go:build linux
+
+/*
+ Copyright The containerd Authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package server
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/containerd/cgroups/v3"
+ "github.com/containerd/cgroups/v3/cgroup1"
+ cg1 "github.com/containerd/cgroups/v3/cgroup1/stats"
+ cgroupsv2 "github.com/containerd/cgroups/v3/cgroup2"
+ cg2 "github.com/containerd/cgroups/v3/cgroup2/stats"
+ "github.com/containerd/log"
+ "github.com/containerd/typeurl/v2"
+
+ "github.com/containerd/containerd/api/services/tasks/v1"
+ criconfig "github.com/containerd/containerd/v2/internal/cri/config"
+ "github.com/containerd/containerd/v2/internal/cri/store/stats"
+ ctrdutil "github.com/containerd/containerd/v2/internal/cri/util"
+)
+
+const (
+ // defaultCollectionInterval is how often the collector fetches metrics.
+ // This matches cAdvisor's default housekeeping interval.
+ defaultCollectionInterval = 1 * time.Second
+
+ // defaultStatsAge is how long stats samples are retained.
+ defaultStatsAge = 2 * time.Minute
+)
+
+// StatsCollector periodically collects CPU stats for containers and sandboxes,
+// storing them in TimedStores. This allows UsageNanoCores to be calculated
+// from historical samples, similar to how cAdvisor works.
+type StatsCollector struct {
+ mu sync.RWMutex
+ // stores maps container/sandbox ID to their TimedStore
+ stores map[string]*stats.TimedStore
+ // interval is how often to collect stats
+ interval time.Duration
+ // statsAge is how long to keep samples
+ statsAge time.Duration
+ // maxSamples is the max number of samples per container
+ maxSamples int
+ // stopCh is used to stop the collection loop
+ stopCh chan struct{}
+ // doneCh signals when the collection loop has stopped
+ doneCh chan struct{}
+ // stopOnce ensures Stop() only closes stopCh once
+ stopOnce sync.Once
+ // service provides access to task metrics
+ service *criService
+}
+
+// NewStatsCollector creates a new StatsCollector.
+// The service must be set via SetService before calling Start.
+func NewStatsCollector(config criconfig.Config) *StatsCollector {
+ interval := defaultCollectionInterval
+ statsAge := defaultStatsAge
+
+ // Use config values if provided
+ if config.StatsCollectionPeriod != "" {
+ if d, err := time.ParseDuration(config.StatsCollectionPeriod); err == nil {
+ interval = d
+ }
+ }
+ if config.StatsRetentionPeriod != "" {
+ if d, err := time.ParseDuration(config.StatsRetentionPeriod); err == nil {
+ statsAge = d
+ }
+ }
+
+ // Calculate maxSamples from statsAge and interval
+ maxSamples := int(statsAge / interval)
+ if maxSamples < 2 {
+ maxSamples = 2 // Need at least 2 samples to calculate rate
+ }
+
+ return &StatsCollector{
+ stores: make(map[string]*stats.TimedStore),
+ interval: interval,
+ statsAge: statsAge,
+ maxSamples: maxSamples,
+ stopCh: make(chan struct{}),
+ doneCh: make(chan struct{}),
+ }
+}
+
+// SetService sets the criService reference. Must be called before Start.
+func (c *StatsCollector) SetService(service *criService) {
+ c.service = service
+}
+
+// Start begins the background stats collection loop.
+func (c *StatsCollector) Start() {
+ go c.collectLoop()
+}
+
+// Stop stops the background stats collection loop and waits for it to finish.
+// It is safe to call Stop multiple times.
+func (c *StatsCollector) Stop() {
+ c.stopOnce.Do(func() {
+ close(c.stopCh)
+ })
+ <-c.doneCh
+}
+
+// collectLoop periodically collects stats for all containers and sandboxes.
+func (c *StatsCollector) collectLoop() {
+ ticker := time.NewTicker(c.interval)
+ defer ticker.Stop()
+ defer close(c.doneCh)
+
+ // Do an initial collection immediately
+ c.collect()
+
+ for {
+ select {
+ case <-c.stopCh:
+ return
+ case <-ticker.C:
+ c.collect()
+ }
+ }
+}
+
+// collect fetches metrics for all containers and sandboxes and stores them.
+func (c *StatsCollector) collect() {
+ // Use namespaced context to access containers in the k8s.io namespace
+ ctx := ctrdutil.NamespacedContext()
+
+ // Collect container stats
+ c.collectContainerStats(ctx)
+
+ // Collect sandbox stats (Linux only, uses cgroup metrics)
+ c.collectSandboxStats(ctx)
+}
+
+// collectContainerStats fetches metrics for all containers.
+func (c *StatsCollector) collectContainerStats(ctx context.Context) {
+ containers := c.service.containerStore.List()
+ if len(containers) == 0 {
+ return
+ }
+
+ // Build request for all containers
+ req := &tasks.MetricsRequest{}
+ for _, cntr := range containers {
+ req.Filters = append(req.Filters, "id=="+cntr.ID)
+ }
+
+ resp, err := c.service.client.TaskService().Metrics(ctx, req)
+ if err != nil {
+ log.G(ctx).WithError(err).Debug("StatsCollector: failed to fetch container metrics")
+ return
+ }
+
+ timestamp := time.Now()
+ for _, metric := range resp.Metrics {
+ usageCoreNanoSeconds, ok := extractCPUUsage(metric.Data)
+ if !ok {
+ continue
+ }
+ c.addSample(metric.ID, timestamp, usageCoreNanoSeconds)
+ }
+}
+
+// collectSandboxStats fetches metrics for all sandboxes.
+// On Linux, sandbox/pod stats come from the parent cgroup (which includes all
+// containers in the pod), not from the pause container's task metrics.
+// This matches how sandbox_stats_linux.go retrieves pod-level CPU stats.
+func (c *StatsCollector) collectSandboxStats(ctx context.Context) {
+ sandboxes := c.service.sandboxStore.List()
+ if len(sandboxes) == 0 {
+ return
+ }
+
+ timestamp := time.Now()
+ for _, sb := range sandboxes {
+ // Get the parent cgroup path for the pod
+ cgroupPath := sb.Config.GetLinux().GetCgroupParent()
+ if cgroupPath == "" {
+ continue
+ }
+
+ usageCoreNanoSeconds, ok := c.getCgroupCPUUsage(ctx, cgroupPath)
+ if !ok {
+ continue
+ }
+ c.addSample(sb.ID, timestamp, usageCoreNanoSeconds)
+ }
+}
+
+// getCgroupCPUUsage reads CPU usage from a cgroup path.
+// Supports both cgroupv1 and cgroupv2.
+func (c *StatsCollector) getCgroupCPUUsage(ctx context.Context, cgroupPath string) (uint64, bool) {
+ switch cgroups.Mode() {
+ case cgroups.Unified:
+ cg, err := cgroupsv2.Load(cgroupPath)
+ if err != nil {
+ log.G(ctx).WithError(err).Debugf("StatsCollector: failed to load cgroupv2: %s", cgroupPath)
+ return 0, false
+ }
+ stats, err := cg.Stat()
+ if err != nil {
+ log.G(ctx).WithError(err).Debugf("StatsCollector: failed to get cgroupv2 stats: %s", cgroupPath)
+ return 0, false
+ }
+ if stats.CPU != nil {
+ // cgroupv2 reports in microseconds, convert to nanoseconds
+ return stats.CPU.UsageUsec * 1000, true
+ }
+ default:
+ control, err := cgroup1.Load(cgroup1.StaticPath(cgroupPath))
+ if err != nil {
+ log.G(ctx).WithError(err).Debugf("StatsCollector: failed to load cgroupv1: %s", cgroupPath)
+ return 0, false
+ }
+ stats, err := control.Stat(cgroup1.IgnoreNotExist)
+ if err != nil {
+ log.G(ctx).WithError(err).Debugf("StatsCollector: failed to get cgroupv1 stats: %s", cgroupPath)
+ return 0, false
+ }
+ if stats.CPU != nil && stats.CPU.Usage != nil {
+ return stats.CPU.Usage.Total, true
+ }
+ }
+ return 0, false
+}
+
+// addSample adds a CPU sample for the given container/sandbox ID.
+// If a store doesn't exist for the ID, one is created (fallback for containers
+// that existed before the collector started, e.g., after a restart).
+func (c *StatsCollector) addSample(id string, timestamp time.Time, usageCoreNanoSeconds uint64) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ store, ok := c.stores[id]
+ if !ok {
+ store = stats.NewTimedStore(c.statsAge, c.maxSamples)
+ c.stores[id] = store
+ }
+ store.Add(timestamp, usageCoreNanoSeconds)
+}
+
+// GetUsageNanoCores returns the latest calculated UsageNanoCores for the given
+// container/sandbox ID. Returns 0 and false if no data is available or if
+// there aren't enough samples to calculate the rate.
+func (c *StatsCollector) GetUsageNanoCores(id string) (uint64, bool) {
+ c.mu.RLock()
+ store, ok := c.stores[id]
+ c.mu.RUnlock()
+
+ if !ok {
+ return 0, false
+ }
+
+ return store.GetLatestUsageNanoCores()
+}
+
+// GetLatestSample returns the latest CPU sample for the given container/sandbox ID.
+// Returns nil if no data is available.
+func (c *StatsCollector) GetLatestSample(id string) *stats.CPUSample {
+ c.mu.RLock()
+ store, ok := c.stores[id]
+ c.mu.RUnlock()
+
+ if !ok {
+ return nil
+ }
+
+ return store.GetLatest()
+}
+
+// AddContainer creates a stats store for a container/sandbox.
+// Should be called when a container or sandbox is added to the store.
+func (c *StatsCollector) AddContainer(id string) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if _, ok := c.stores[id]; !ok {
+ c.stores[id] = stats.NewTimedStore(c.statsAge, c.maxSamples)
+ }
+}
+
+// RemoveContainer removes the stats store for a container/sandbox.
+// Should be called when a container or sandbox is removed from the store.
+func (c *StatsCollector) RemoveContainer(id string) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ delete(c.stores, id)
+}
+
+// extractCPUUsage extracts the CPU usage in nanoseconds from the metric data.
+// Supports both cgroupv1 and cgroupv2 metrics.
+func extractCPUUsage(data typeurl.Any) (uint64, bool) {
+ if data == nil {
+ return 0, false
+ }
+
+ taskMetrics, err := typeurl.UnmarshalAny(data)
+ if err != nil {
+ return 0, false
+ }
+
+ switch v := taskMetrics.(type) {
+ case *cg1.Metrics:
+ if v.CPU != nil && v.CPU.Usage != nil {
+ return v.CPU.Usage.Total, true
+ }
+ case *cg2.Metrics:
+ if v.CPU != nil {
+ // cgroupv2 reports in microseconds, convert to nanoseconds
+ return v.CPU.UsageUsec * 1000, true
+ }
+ }
+
+ return 0, false
+}
--- /dev/null
+//go:build !linux
+
+/*
+ Copyright The containerd Authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package server
+
+import (
+ criconfig "github.com/containerd/containerd/v2/internal/cri/config"
+ "github.com/containerd/containerd/v2/internal/cri/store/stats"
+)
+
+// StatsCollector is a stub for non-Linux platforms.
+// On Linux, the real implementation periodically collects CPU stats for
+// containers and sandboxes, storing them in TimedStores.
+type StatsCollector struct{}
+
+// NewStatsCollector creates a new StatsCollector (stub on non-Linux).
+func NewStatsCollector(config criconfig.Config) *StatsCollector {
+ return &StatsCollector{}
+}
+
+// SetService sets the criService reference (no-op on non-Linux).
+func (c *StatsCollector) SetService(service *criService) {}
+
+// Start begins the background stats collection loop (no-op on non-Linux).
+func (c *StatsCollector) Start() {}
+
+// Stop stops the background stats collection loop (no-op on non-Linux).
+func (c *StatsCollector) Stop() {}
+
+// GetUsageNanoCores returns the latest calculated UsageNanoCores for the given
+// container/sandbox ID. Returns 0 and false on non-Linux platforms.
+func (c *StatsCollector) GetUsageNanoCores(id string) (uint64, bool) {
+ return 0, false
+}
+
+// GetLatestSample returns the latest CPU sample for the given container/sandbox ID.
+// Returns nil on non-Linux platforms.
+func (c *StatsCollector) GetLatestSample(id string) *stats.CPUSample {
+ return nil
+}
+
+// AddContainer creates a stats store for a container/sandbox (no-op on non-Linux).
+func (c *StatsCollector) AddContainer(id string) {}
+
+// RemoveContainer removes the stats store for a container/sandbox (no-op on non-Linux).
+func (c *StatsCollector) RemoveContainer(id string) {}
// Store stores all Containers.
type Store struct {
- lock sync.RWMutex
- containers map[string]Container
- idIndex *truncindex.TruncIndex
- labels *label.Store
+ lock sync.RWMutex
+ containers map[string]Container
+ idIndex *truncindex.TruncIndex
+ labels *label.Store
+ statsCollector store.StatsCollector
}
// NewStore creates a container store.
-func NewStore(labels *label.Store) *Store {
+func NewStore(labels *label.Store, statsCollector store.StatsCollector) *Store {
return &Store{
- containers: make(map[string]Container),
- idIndex: truncindex.NewTruncIndex([]string{}),
- labels: labels,
+ containers: make(map[string]Container),
+ idIndex: truncindex.NewTruncIndex([]string{}),
+ labels: labels,
+ statsCollector: statsCollector,
}
}
return err
}
s.containers[c.ID] = c
+ if s.statsCollector != nil {
+ s.statsCollector.AddContainer(c.ID)
+ }
return nil
}
s.labels.Release(c.ProcessLabel)
s.idIndex.Delete(id)
delete(s.containers, id)
+ if s.statsCollector != nil {
+ s.statsCollector.RemoveContainer(id)
+ }
}
containers[id] = container
}
- s := NewStore(label.NewStore())
+ s := NewStore(label.NewStore(), nil)
reserved := map[string]bool{}
s.labels.Reserver = func(label string) {
reserved[strings.SplitN(label, ":", 4)[3]] = true
// Store stores all sandboxes.
type Store struct {
- lock sync.RWMutex
- sandboxes map[string]Sandbox
- idIndex *truncindex.TruncIndex
- labels *label.Store
+ lock sync.RWMutex
+ sandboxes map[string]Sandbox
+ idIndex *truncindex.TruncIndex
+ labels *label.Store
+ statsCollector store.StatsCollector
}
// NewStore creates a sandbox store.
-func NewStore(labels *label.Store) *Store {
+func NewStore(labels *label.Store, statsCollector store.StatsCollector) *Store {
return &Store{
- sandboxes: make(map[string]Sandbox),
- idIndex: truncindex.NewTruncIndex([]string{}),
- labels: labels,
+ sandboxes: make(map[string]Sandbox),
+ idIndex: truncindex.NewTruncIndex([]string{}),
+ labels: labels,
+ statsCollector: statsCollector,
}
}
return err
}
s.sandboxes[sb.ID] = sb
+ if s.statsCollector != nil {
+ s.statsCollector.AddContainer(sb.ID)
+ }
return nil
}
s.labels.Release(s.sandboxes[id].ProcessLabel)
s.idIndex.Delete(id)
delete(s.sandboxes, id)
+ if s.statsCollector != nil {
+ s.statsCollector.RemoveContainer(id)
+ }
}
},
}
assert := assertlib.New(t)
- s := NewStore(label.NewStore())
+ s := NewStore(label.NewStore(), nil)
t.Logf("should be able to add sandbox")
for _, sb := range sandboxes {
--- /dev/null
+/*
+ Copyright The containerd Authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package stats
+
+import (
+ "sort"
+ "sync"
+ "time"
+)
+
+// CPUSample holds a single CPU usage sample with timestamp.
+type CPUSample struct {
+ Timestamp time.Time
+ UsageCoreNanoSeconds uint64
+ // UsageNanoCores is the instantaneous CPU usage rate calculated
+ // from this sample and the previous sample.
+ UsageNanoCores uint64
+}
+
+// timedStoreDataSlice is a time-ordered slice of CPU samples.
+type timedStoreDataSlice []CPUSample
+
+func (t timedStoreDataSlice) Less(i, j int) bool {
+ return t[i].Timestamp.Before(t[j].Timestamp)
+}
+
+func (t timedStoreDataSlice) Len() int {
+ return len(t)
+}
+
+func (t timedStoreDataSlice) Swap(i, j int) {
+ t[i], t[j] = t[j], t[i]
+}
+
+// TimedStore is a time-based buffer for CPU stats samples.
+// It stores samples for a specific time period and/or a max number of items,
+// similar to cAdvisor's TimedStore but specialized for CPU stats.
+// All methods are thread-safe.
+type TimedStore struct {
+ mu sync.RWMutex
+ buffer timedStoreDataSlice
+ age time.Duration
+ maxItems int
+}
+
+// NewTimedStore creates a new TimedStore.
+// age specifies how long samples are retained.
+// maxItems specifies the maximum number of samples to keep (-1 for no limit).
+func NewTimedStore(age time.Duration, maxItems int) *TimedStore {
+ capacity := maxItems
+ if capacity < 0 {
+ capacity = 0 // Will grow dynamically
+ }
+ return &TimedStore{
+ buffer: make(timedStoreDataSlice, 0, capacity),
+ age: age,
+ maxItems: maxItems,
+ }
+}
+
+// Add adds a new CPU sample to the store.
+// It calculates UsageNanoCores from the previous sample if available.
+// Samples are stored in timestamp order, and old samples are evicted
+// based on age and maxItems constraints.
+func (s *TimedStore) Add(timestamp time.Time, usageCoreNanoSeconds uint64) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ sample := CPUSample{
+ Timestamp: timestamp,
+ UsageCoreNanoSeconds: usageCoreNanoSeconds,
+ }
+
+ // Calculate UsageNanoCores from the previous sample if we have one
+ if len(s.buffer) > 0 {
+ lastSample := s.buffer[len(s.buffer)-1]
+ sample.UsageNanoCores = calculateUsageNanoCores(lastSample, sample)
+ }
+
+ // Common case: data is added in order
+ if len(s.buffer) == 0 || !timestamp.Before(s.buffer[len(s.buffer)-1].Timestamp) {
+ s.buffer = append(s.buffer, sample)
+ } else {
+ // Data is out of order; insert it in the correct position
+ index := sort.Search(len(s.buffer), func(index int) bool {
+ return s.buffer[index].Timestamp.After(timestamp)
+ })
+ s.buffer = append(s.buffer, CPUSample{}) // Make room
+ copy(s.buffer[index+1:], s.buffer[index:])
+ s.buffer[index] = sample
+ // Recalculate UsageNanoCores for this and the next sample
+ if index > 0 {
+ s.buffer[index].UsageNanoCores = calculateUsageNanoCores(s.buffer[index-1], s.buffer[index])
+ }
+ if index+1 < len(s.buffer) {
+ s.buffer[index+1].UsageNanoCores = calculateUsageNanoCores(s.buffer[index], s.buffer[index+1])
+ }
+ }
+
+ // Remove any elements before eviction time
+ evictTime := timestamp.Add(-s.age)
+ index := sort.Search(len(s.buffer), func(index int) bool {
+ return s.buffer[index].Timestamp.After(evictTime)
+ })
+ if index < len(s.buffer) && index > 0 {
+ s.buffer = s.buffer[index:]
+ }
+
+ // Remove any elements if over max size
+ if s.maxItems >= 0 && len(s.buffer) > s.maxItems {
+ startIndex := len(s.buffer) - s.maxItems
+ s.buffer = s.buffer[startIndex:]
+ }
+}
+
+// GetLatest returns the most recent sample, or nil if no samples exist.
+func (s *TimedStore) GetLatest() *CPUSample {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ if len(s.buffer) == 0 {
+ return nil
+ }
+ sample := s.buffer[len(s.buffer)-1]
+ return &sample
+}
+
+// GetLatestUsageNanoCores returns the most recent UsageNanoCores value.
+// Returns 0 and false if no samples with calculated UsageNanoCores exist.
+func (s *TimedStore) GetLatestUsageNanoCores() (uint64, bool) {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+
+ // We need at least 2 samples to have a calculated UsageNanoCores
+ if len(s.buffer) < 2 {
+ return 0, false
+ }
+
+ sample := s.buffer[len(s.buffer)-1]
+ return sample.UsageNanoCores, true
+}
+
+// Size returns the number of samples in the store.
+func (s *TimedStore) Size() int {
+ s.mu.RLock()
+ defer s.mu.RUnlock()
+ return len(s.buffer)
+}
+
+// calculateUsageNanoCores calculates the instantaneous CPU usage rate
+// (nanocores) from two consecutive samples.
+// Returns 0 if the calculation cannot be performed (e.g., invalid time delta).
+func calculateUsageNanoCores(prev, cur CPUSample) uint64 {
+ nanoSeconds := cur.Timestamp.UnixNano() - prev.Timestamp.UnixNano()
+
+ // Zero or negative interval
+ if nanoSeconds <= 0 {
+ return 0
+ }
+
+ // CPU usage can't go backwards (might happen if container was restarted)
+ if cur.UsageCoreNanoSeconds < prev.UsageCoreNanoSeconds {
+ return 0
+ }
+
+ usageDelta := cur.UsageCoreNanoSeconds - prev.UsageCoreNanoSeconds
+ return uint64(float64(usageDelta) / float64(nanoSeconds) * float64(time.Second/time.Nanosecond))
+}
--- /dev/null
+/*
+ Copyright The containerd Authors.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+*/
+
+package stats
+
+import (
+ "testing"
+ "time"
+)
+
+func TestTimedStoreBasic(t *testing.T) {
+ store := NewTimedStore(time.Minute, 10)
+
+ // Initially empty
+ if store.Size() != 0 {
+ t.Errorf("expected size 0, got %d", store.Size())
+ }
+
+ // GetLatest should return nil when empty
+ if sample := store.GetLatest(); sample != nil {
+ t.Errorf("expected nil sample, got %v", sample)
+ }
+
+ // GetLatestUsageNanoCores should return false when empty
+ if _, ok := store.GetLatestUsageNanoCores(); ok {
+ t.Error("expected false for empty store")
+ }
+}
+
+func TestTimedStoreAddAndGet(t *testing.T) {
+ store := NewTimedStore(time.Minute, 10)
+
+ now := time.Now()
+
+ // Add first sample
+ store.Add(now, 1000000000) // 1 second of CPU time in nanoseconds
+
+ if store.Size() != 1 {
+ t.Errorf("expected size 1, got %d", store.Size())
+ }
+
+ sample := store.GetLatest()
+ if sample == nil {
+ t.Fatal("expected non-nil sample")
+ }
+ if sample.UsageCoreNanoSeconds != 1000000000 {
+ t.Errorf("expected UsageCoreNanoSeconds 1000000000, got %d", sample.UsageCoreNanoSeconds)
+ }
+ // First sample should have 0 UsageNanoCores (no previous sample to calculate from)
+ if sample.UsageNanoCores != 0 {
+ t.Errorf("expected UsageNanoCores 0 for first sample, got %d", sample.UsageNanoCores)
+ }
+
+ // GetLatestUsageNanoCores should still return false (need at least 2 samples)
+ if _, ok := store.GetLatestUsageNanoCores(); ok {
+ t.Error("expected false with only 1 sample")
+ }
+}
+
+func TestTimedStoreUsageNanoCoresCalculation(t *testing.T) {
+ store := NewTimedStore(time.Minute, 10)
+
+ now := time.Now()
+
+ // Add first sample: 1 second of CPU time
+ store.Add(now, 1000000000)
+
+ // Add second sample after 1 second: 2 seconds of CPU time
+ // This means the CPU was running at 100% (1 core) during that second
+ store.Add(now.Add(time.Second), 2000000000)
+
+ if store.Size() != 2 {
+ t.Errorf("expected size 2, got %d", store.Size())
+ }
+
+ // Now we should be able to get UsageNanoCores
+ usageNanoCores, ok := store.GetLatestUsageNanoCores()
+ if !ok {
+ t.Fatal("expected true with 2 samples")
+ }
+
+ // Expected: (2000000000 - 1000000000) / 1000000000 * 1000000000 = 1000000000 nanocores (1 core)
+ expected := uint64(1000000000)
+ if usageNanoCores != expected {
+ t.Errorf("expected UsageNanoCores %d, got %d", expected, usageNanoCores)
+ }
+}
+
+func TestTimedStoreHalfCoreUsage(t *testing.T) {
+ store := NewTimedStore(time.Minute, 10)
+
+ now := time.Now()
+
+ // Add first sample
+ store.Add(now, 0)
+
+ // Add second sample after 2 seconds: 1 second of CPU time
+ // This means the CPU was running at 50% (0.5 cores) during those 2 seconds
+ store.Add(now.Add(2*time.Second), 1000000000)
+
+ usageNanoCores, ok := store.GetLatestUsageNanoCores()
+ if !ok {
+ t.Fatal("expected true with 2 samples")
+ }
+
+ // Expected: 1000000000 / 2000000000 * 1000000000 = 500000000 nanocores (0.5 cores)
+ expected := uint64(500000000)
+ if usageNanoCores != expected {
+ t.Errorf("expected UsageNanoCores %d, got %d", expected, usageNanoCores)
+ }
+}
+
+func TestTimedStoreMaxItems(t *testing.T) {
+ store := NewTimedStore(time.Hour, 3) // Keep at most 3 samples
+
+ now := time.Now()
+
+ // Add 5 samples
+ for i := 0; i < 5; i++ {
+ store.Add(now.Add(time.Duration(i)*time.Second), uint64(i)*1000000000)
+ }
+
+ // Should only have 3 samples
+ if store.Size() != 3 {
+ t.Errorf("expected size 3, got %d", store.Size())
+ }
+
+ // Latest sample should be the 5th one (index 4)
+ sample := store.GetLatest()
+ if sample == nil {
+ t.Fatal("expected non-nil sample")
+ }
+ if sample.UsageCoreNanoSeconds != 4000000000 {
+ t.Errorf("expected UsageCoreNanoSeconds 4000000000, got %d", sample.UsageCoreNanoSeconds)
+ }
+}
+
+func TestTimedStoreAgeEviction(t *testing.T) {
+ store := NewTimedStore(3*time.Second, -1) // Keep samples for 3 seconds, no item limit
+
+ now := time.Now()
+
+ // Add sample at t=0
+ store.Add(now, 0)
+
+ // Add sample at t=1s
+ store.Add(now.Add(1*time.Second), 1000000000)
+
+ // Add sample at t=2s
+ store.Add(now.Add(2*time.Second), 2000000000)
+
+ if store.Size() != 3 {
+ t.Errorf("expected size 3, got %d", store.Size())
+ }
+
+ // Add sample at t=4s - this should evict t=0 sample only
+ // evictTime = t=4s - 3s = t=1s, so samples strictly after t=1s are kept (t=2s, t=4s)
+ store.Add(now.Add(4*time.Second), 4000000000)
+
+ // Should have t=2s and t=4s samples (samples strictly after evictTime t=1s)
+ if store.Size() != 2 {
+ t.Errorf("expected size 2 after eviction, got %d", store.Size())
+ }
+
+ // Verify latest sample is the one we just added
+ sample := store.GetLatest()
+ if sample == nil {
+ t.Fatal("expected non-nil sample")
+ }
+ if sample.UsageCoreNanoSeconds != 4000000000 {
+ t.Errorf("expected UsageCoreNanoSeconds 4000000000, got %d", sample.UsageCoreNanoSeconds)
+ }
+}
+
+func TestTimedStoreConcurrentAccess(t *testing.T) {
+ store := NewTimedStore(time.Minute, 100)
+
+ now := time.Now()
+ done := make(chan bool)
+
+ // Start multiple goroutines adding samples
+ for i := 0; i < 10; i++ {
+ go func(id int) {
+ for j := 0; j < 100; j++ {
+ store.Add(now.Add(time.Duration(id*100+j)*time.Millisecond), uint64(id*100+j)*1000000)
+ }
+ done <- true
+ }(i)
+ }
+
+ // Start multiple goroutines reading
+ for i := 0; i < 10; i++ {
+ go func() {
+ for j := 0; j < 100; j++ {
+ store.GetLatest()
+ store.GetLatestUsageNanoCores()
+ store.Size()
+ }
+ done <- true
+ }()
+ }
+
+ // Wait for all goroutines
+ for i := 0; i < 20; i++ {
+ <-done
+ }
+
+ // Just verify no panic occurred and size is reasonable
+ if store.Size() > 100 {
+ t.Errorf("expected size <= 100, got %d", store.Size())
+ }
+}
+
+func TestCalculateUsageNanoCores(t *testing.T) {
+ // Use a fixed base time to ensure consistent time deltas in tests
+ now := time.Now()
+
+ tests := []struct {
+ name string
+ prev CPUSample
+ cur CPUSample
+ expected uint64
+ }{
+ {
+ name: "normal case - 1 core",
+ prev: CPUSample{
+ Timestamp: now,
+ UsageCoreNanoSeconds: 0,
+ },
+ cur: CPUSample{
+ Timestamp: now.Add(time.Second),
+ UsageCoreNanoSeconds: 1000000000,
+ },
+ expected: 1000000000, // 1 core
+ },
+ {
+ name: "normal case - 2 cores",
+ prev: CPUSample{
+ Timestamp: now,
+ UsageCoreNanoSeconds: 0,
+ },
+ cur: CPUSample{
+ Timestamp: now.Add(time.Second),
+ UsageCoreNanoSeconds: 2000000000,
+ },
+ expected: 2000000000, // 2 cores
+ },
+ {
+ name: "zero time delta",
+ prev: CPUSample{
+ Timestamp: now,
+ UsageCoreNanoSeconds: 0,
+ },
+ cur: CPUSample{
+ Timestamp: now, // same timestamp
+ UsageCoreNanoSeconds: 1000000000,
+ },
+ expected: 0,
+ },
+ {
+ name: "negative time delta",
+ prev: CPUSample{
+ Timestamp: now.Add(time.Second),
+ UsageCoreNanoSeconds: 0,
+ },
+ cur: CPUSample{
+ Timestamp: now, // earlier than prev
+ UsageCoreNanoSeconds: 1000000000,
+ },
+ expected: 0,
+ },
+ {
+ name: "CPU usage decreased (container restart)",
+ prev: CPUSample{
+ Timestamp: now,
+ UsageCoreNanoSeconds: 2000000000,
+ },
+ cur: CPUSample{
+ Timestamp: now.Add(time.Second),
+ UsageCoreNanoSeconds: 1000000000, // less than prev
+ },
+ expected: 0,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := calculateUsageNanoCores(tt.prev, tt.cur)
+ if result != tt.expected {
+ t.Errorf("calculateUsageNanoCores() = %d, want %d", result, tt.expected)
+ }
+ })
+ }
+}
import "sync"
+// StatsCollector is an interface for managing container/sandbox stats.
+// The stats collector tracks CPU usage samples for calculating UsageNanoCores.
+type StatsCollector interface {
+ // AddContainer creates a stats store for the container/sandbox.
+ AddContainer(id string)
+ // RemoveContainer removes the stats store for the container/sandbox.
+ RemoveContainer(id string)
+}
+
// StopCh is used to propagate the stop information of a container.
type StopCh struct {
ch chan struct{}