]> git.feebdaed.xyz Git - 0xmirror/cilium.git/commitdiff
clustermesh-apiserver: Implement global namespace filtering in synchronizer
authorAnubhab Majumdar <anmajumdar@microsoft.com>
Tue, 16 Dec 2025 23:33:11 +0000 (17:33 -0600)
committerArthur Outhenin-Chalandre <git@mrfreezeex.fr>
Fri, 19 Dec 2025 17:30:53 +0000 (17:30 +0000)
Add namespace filtering to synchronizer to only sync resources from
global namespaces.

- Add Namespacer interface and implementations for namespace extraction
- Integrate namespace manager into synchronizer to filter events
- Watch namespace changes and resynchronize affected resources
- Mark CiliumIdentity, CiliumEndpoint, and CiliumEndpointSlice as namespaced

Signed-off-by: Anubhab Majumdar <anmajumdar@microsoft.com>
clustermesh-apiserver/clustermesh/cells.go
clustermesh-apiserver/clustermesh/converters.go
clustermesh-apiserver/clustermesh/namespacers.go [new file with mode: 0644]
clustermesh-apiserver/clustermesh/synchronizer.go

index ac624d52c3b373f74ba1fea22b6997da9c5e0421..aca515846e0ab301feedd0c79d0e43aafc3a6a07 100644 (file)
@@ -15,6 +15,7 @@ import (
        clustercfgcell "github.com/cilium/cilium/pkg/clustermesh/clustercfg/cell"
        "github.com/cilium/cilium/pkg/clustermesh/mcsapi"
        mcsapitypes "github.com/cilium/cilium/pkg/clustermesh/mcsapi/types"
+       cmnamespace "github.com/cilium/cilium/pkg/clustermesh/namespace"
        "github.com/cilium/cilium/pkg/defaults"
        "github.com/cilium/cilium/pkg/gops"
        cilium_api_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
@@ -68,6 +69,9 @@ var Synchronization = cell.Module(
        "clustermesh-synchronization",
        "Synchronize information from Kubernetes to KVStore",
 
+       // Provide the namespace manager.
+       cmnamespace.Cell,
+
        cell.Group(
                cell.Provide(
                        func(syncState syncstate.SyncState) operatorWatchers.ServiceSyncConfig {
@@ -101,6 +105,7 @@ var Synchronization = cell.Module(
                cell.Provide(
                        newCiliumIdentityOptions,
                        newCiliumIdentityConverter,
+                       newCiliumIdentityNamespacer,
                ),
                cell.Invoke(RegisterSynchronizer[*cilium_api_v2.CiliumIdentity]),
        ),
@@ -109,6 +114,7 @@ var Synchronization = cell.Module(
                cell.Provide(
                        newCiliumEndpointOptions,
                        newCiliumEndpointConverter,
+                       newCiliumEndpointNamespacer,
                ),
                cell.Invoke(RegisterSynchronizer[*types.CiliumEndpoint]),
        ),
@@ -117,6 +123,7 @@ var Synchronization = cell.Module(
                cell.Provide(
                        newCiliumEndpointSliceOptions,
                        newCiliumEndpointSliceConverter,
+                       newCiliumEndpointSliceNamespacer,
                ),
                cell.Invoke(RegisterSynchronizer[*cilium_api_v2a1.CiliumEndpointSlice]),
        ),
index 3620cbd9b2eadd7334562b5f56bd9b848c47c2c3..95c4f1cb70cc8bce4b764df9e64362e244bf6e6c 100644 (file)
@@ -126,10 +126,11 @@ func (nc *CiliumNodeConverter) Convert(event resource.Event[*cilium_api_v2.Ciliu
 
 func newCiliumIdentityOptions() Options[*cilium_api_v2.CiliumIdentity] {
        return Options[*cilium_api_v2.CiliumIdentity]{
-               Enabled:   true,
-               Resource:  "CiliumIdentity",
-               Prefix:    path.Join(identityCache.IdentitiesPath, "id"),
-               StoreOpts: []store.WSSOpt{store.WSSWithSyncedKeyOverride(identityCache.IdentitiesPath)},
+               Enabled:    true,
+               Resource:   "CiliumIdentity",
+               Prefix:     path.Join(identityCache.IdentitiesPath, "id"),
+               StoreOpts:  []store.WSSOpt{store.WSSWithSyncedKeyOverride(identityCache.IdentitiesPath)},
+               Namespaced: true,
        }
 }
 
@@ -166,10 +167,11 @@ func (ic *CiliumIdentityConverter) Convert(event resource.Event[*cilium_api_v2.C
 
 func newCiliumEndpointOptions(cfg cmk8s.CiliumEndpointSliceConfig) Options[*types.CiliumEndpoint] {
        return Options[*types.CiliumEndpoint]{
-               Enabled:   !cfg.EnableCiliumEndpointSlice,
-               Resource:  "CiliumEndpoint",
-               Prefix:    path.Join(ipcache.IPIdentitiesPath, ipcache.DefaultAddressSpace),
-               StoreOpts: []store.WSSOpt{store.WSSWithSyncedKeyOverride(ipcache.IPIdentitiesPath)},
+               Enabled:    !cfg.EnableCiliumEndpointSlice,
+               Resource:   "CiliumEndpoint",
+               Prefix:     path.Join(ipcache.IPIdentitiesPath, ipcache.DefaultAddressSpace),
+               StoreOpts:  []store.WSSOpt{store.WSSWithSyncedKeyOverride(ipcache.IPIdentitiesPath)},
+               Namespaced: true,
        }
 }
 
@@ -212,10 +214,11 @@ func ciliumEndpointMapper(endpoint *types.CiliumEndpoint) iter.Seq[store.Key] {
 
 func newCiliumEndpointSliceOptions(cfg cmk8s.CiliumEndpointSliceConfig) Options[*cilium_api_v2a1.CiliumEndpointSlice] {
        return Options[*cilium_api_v2a1.CiliumEndpointSlice]{
-               Enabled:   cfg.EnableCiliumEndpointSlice,
-               Resource:  "CiliumEndpointSlice",
-               Prefix:    path.Join(ipcache.IPIdentitiesPath, ipcache.DefaultAddressSpace),
-               StoreOpts: []store.WSSOpt{store.WSSWithSyncedKeyOverride(ipcache.IPIdentitiesPath)},
+               Enabled:    cfg.EnableCiliumEndpointSlice,
+               Resource:   "CiliumEndpointSlice",
+               Prefix:     path.Join(ipcache.IPIdentitiesPath, ipcache.DefaultAddressSpace),
+               StoreOpts:  []store.WSSOpt{store.WSSWithSyncedKeyOverride(ipcache.IPIdentitiesPath)},
+               Namespaced: true,
        }
 }
 
diff --git a/clustermesh-apiserver/clustermesh/namespacers.go b/clustermesh-apiserver/clustermesh/namespacers.go
new file mode 100644 (file)
index 0000000..dc09141
--- /dev/null
@@ -0,0 +1,52 @@
+// SPDX-License-Identifier: Apache-2.0
+// Copyright Authors of Cilium
+
+package clustermesh
+
+import (
+       "k8s.io/apimachinery/pkg/runtime"
+
+       cmk8s "github.com/cilium/cilium/clustermesh-apiserver/clustermesh/k8s"
+       cilium_api_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
+       cilium_api_v2a1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
+       "github.com/cilium/cilium/pkg/k8s/resource"
+       "github.com/cilium/cilium/pkg/k8s/types"
+)
+
+type GenericNamespacer[T runtime.Object] struct {
+       extract func(T) string
+}
+
+func (gn *GenericNamespacer[T]) ExtractNamespace(event resource.Event[T]) (namespace string) {
+       return gn.extract(event.Object)
+}
+
+// ----- CiliumIdentity ----- //
+
+func newCiliumIdentityNamespacer() Namespacer[*cilium_api_v2.CiliumIdentity] {
+       return &GenericNamespacer[*cilium_api_v2.CiliumIdentity]{
+               extract: func(obj *cilium_api_v2.CiliumIdentity) string {
+                       return obj.SecurityLabels[cmk8s.PodPrefixLbl]
+               },
+       }
+}
+
+// ----- CiliumEndpoint ----- //
+
+func newCiliumEndpointNamespacer() Namespacer[*types.CiliumEndpoint] {
+       return &GenericNamespacer[*types.CiliumEndpoint]{
+               extract: func(obj *types.CiliumEndpoint) string {
+                       return obj.Namespace
+               },
+       }
+}
+
+// ----- CiliumEndpointSlice ----- //
+
+func newCiliumEndpointSliceNamespacer() Namespacer[*cilium_api_v2a1.CiliumEndpointSlice] {
+       return &GenericNamespacer[*cilium_api_v2a1.CiliumEndpointSlice]{
+               extract: func(obj *cilium_api_v2a1.CiliumEndpointSlice) string {
+                       return obj.Namespace
+               },
+       }
+}
index c1a530e82948800c4308f62c516f6f60be152554..35595e0d6aa5233a5370809158c7874cdb432ee5 100644 (file)
@@ -13,10 +13,13 @@ import (
        "github.com/cilium/hive/cell"
        "github.com/cilium/hive/job"
        "k8s.io/apimachinery/pkg/runtime"
+       "k8s.io/client-go/tools/cache"
 
        "github.com/cilium/cilium/clustermesh-apiserver/syncstate"
+       cmnamespace "github.com/cilium/cilium/pkg/clustermesh/namespace"
        cmtypes "github.com/cilium/cilium/pkg/clustermesh/types"
        "github.com/cilium/cilium/pkg/k8s/resource"
+       slim_corev1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/api/core/v1"
        "github.com/cilium/cilium/pkg/kvstore"
        "github.com/cilium/cilium/pkg/kvstore/store"
        "github.com/cilium/cilium/pkg/logging/logfields"
@@ -28,6 +31,10 @@ type Options[T runtime.Object] struct {
        Resource  string
        Prefix    string
        StoreOpts []store.WSSOpt
+       // Namespaced indicates whether namespace changes should trigger resynchronization
+       // of all resources of this type. If true, a namespace watcher will be started to monitor
+       // namespace changes and resynchronize resources accordingly. Only required for certain resource types.
+       Namespaced bool
 }
 
 // Converter knows how to convert a given Kubernetes event into the corresponding
@@ -36,6 +43,13 @@ type Converter[T runtime.Object] interface {
        Convert(event resource.Event[T]) (upserts iter.Seq[store.Key], deletes iter.Seq[store.NamedKey])
 }
 
+// Namespacer is an interface that defines methods to handle namespace-related operations
+// for Kubernetes resources in the context of clustermesh synchronization.
+type Namespacer[T runtime.Object] interface {
+       // ExtractNamespace retrieves the namespace of a given event's object.
+       ExtractNamespace(resource.Event[T]) (namespace string)
+}
+
 type syncParams[T runtime.Object] struct {
        cell.In
 
@@ -50,6 +64,10 @@ type syncParams[T runtime.Object] struct {
        Options   Options[T]
        Converter Converter[T]
        SyncState syncstate.SyncState
+
+       NamespaceManager cmnamespace.Manager
+       Namespaces       resource.Resource[*slim_corev1.Namespace]
+       Namespacer       Namespacer[T] `optional:"true"`
 }
 
 // RegisterSynchronizer registers a new synchronizer for the given resource,
@@ -66,44 +84,125 @@ func RegisterSynchronizer[T runtime.Object](in syncParams[T]) {
        store := in.Factory.NewSyncStore(
                in.ClusterInfo.Name, in.Client,
                in.Options.Prefix, in.Options.StoreOpts...)
+
        synced := in.SyncState.WaitForResource()
 
+       // process is a helper function to log and execute store operations.
+       process := func(invoker, op, key string, do func() error) {
+               logger.Info("Updating resource in etcd",
+                       logfields.Reason, invoker,
+                       logfields.Operation, op,
+                       logfields.Key, key,
+               )
+               if err := do(); err != nil {
+                       logger.Warn("Failed updating resource in etcd",
+                               logfields.Error, err,
+                               logfields.Reason, invoker,
+                               logfields.Operation, op,
+                               logfields.Key, key,
+                       )
+               }
+       }
+
        in.JobGroup.Add(
                job.OneShot(
                        fmt.Sprintf("%s-sync", strings.ToLower(in.Options.Resource)),
                        func(ctx context.Context, _ cell.Health) error {
-                               for event := range in.Resource.Events(ctx) {
-                                       event.Done(nil)
+                               resourceStore, err := in.Resource.Store(ctx)
+                               if err != nil {
+                                       return err
+                               }
+                               logger.Info("Starting synchronization")
 
-                                       if event.Kind == resource.Sync {
-                                               logger.Info("Initial entries successfully received from Kubernetes")
-                                               store.Synced(ctx, synced)
-                                               continue
-                                       }
+                               // Get event channels
+                               resourceEvents := in.Resource.Events(ctx)
+                               var namespaceEvents <-chan resource.Event[*slim_corev1.Namespace]
+                               if in.Options.Namespaced {
+                                       logger.Debug("Namespace watcher is enabled for resource type")
+                                       namespaceEvents = in.Namespaces.Events(ctx)
+                               } else {
+                                       logger.Debug("Namespace watcher is not enabled for resource type")
+                               }
 
-                                       process := func(op, key string, do func() error) {
-                                               logger.Info("Updating resource in etcd",
-                                                       logfields.Operation, op,
-                                                       logfields.Key, key,
-                                               )
-
-                                               if err := do(); err != nil {
-                                                       logger.Warn("Failed updating resource in etcd",
-                                                               logfields.Error, err,
-                                                               logfields.Operation, op,
-                                                               logfields.Key, key,
-                                                       )
+                               for resourceEvents != nil || namespaceEvents != nil {
+                                       select {
+                                       case event, ok := <-resourceEvents:
+                                               if !ok {
+                                                       resourceEvents = nil
+                                                       continue
                                                }
-                                       }
 
-                                       upserts, deletes := in.Converter.Convert(event)
-                                       for upsert := range upserts {
-                                               process("upsert", upsert.GetKeyName(), func() error { return store.UpsertKey(ctx, upsert) })
-                                       }
-                                       for delete := range deletes {
-                                               process("delete", delete.GetKeyName(), func() error { return store.DeleteKey(ctx, delete) })
+                                               if event.Kind == resource.Sync {
+                                                       event.Done(nil)
+                                                       logger.Info("Initial entries successfully received from Kubernetes")
+                                                       store.Synced(ctx, synced)
+                                                       continue
+                                               }
+                                               // Filter the event based on namespace global status.
+                                               // Only required for certain resource types.
+                                               // Ignore delete events as they should always be processed.
+                                               if event.Kind != resource.Delete && in.Options.Namespaced {
+                                                       ns := in.Namespacer.ExtractNamespace(event)
+                                                       if ns == "" {
+                                                               logger.Error("Failed to determine namespace for resource event, skipping",
+                                                                       logfields.Name, event.Key.Name,
+                                                               )
+                                                               // No way to process this event, just mark done and continue.
+                                                               event.Done(nil)
+                                                               continue
+                                                       }
+                                                       isGlobal, err := in.NamespaceManager.IsGlobalNamespaceByName(ns)
+                                                       if err != nil {
+                                                               logger.Warn("Failed to determine if namespace is global",
+                                                                       logfields.Error, err,
+                                                                       logfields.K8sNamespace, ns,
+                                                               )
+                                                               // Retry this as it might succeed later.
+                                                               event.Done(err)
+                                                               continue
+                                                       }
+                                                       if !isGlobal {
+                                                               logger.Debug("Deleting resource event as it is not in a global namespace",
+                                                                       logfields.Name, event.Key.Name,
+                                                                       logfields.K8sNamespace, ns,
+                                                               )
+                                                               // Handle resources transitioning out of global namespaces.
+                                                               // If a resource was previously in a global namespace and is now
+                                                               // in a non-global namespace (e.g.,mutable fields like in CiliumEndpointSlice),
+                                                               // we need to delete it from kvstore. Convert the event to a delete to ensure cleanup.
+                                                               // event.Done will be called later during normal processing.
+                                                               event.Kind = resource.Delete
+                                                       }
+                                               }
+
+                                               // No possible errors past this point.
+                                               event.Done(nil)
+
+                                               upserts, deletes := in.Converter.Convert(event)
+                                               for upsert := range upserts {
+                                                       process("resource-event", "upsert", upsert.GetKeyName(), func() error { return store.UpsertKey(ctx, upsert) })
+                                               }
+                                               for delete := range deletes {
+                                                       process("resource-event", "delete", delete.GetKeyName(), func() error { return store.DeleteKey(ctx, delete) })
+                                               }
+                                       case event, ok := <-namespaceEvents:
+                                               if !ok {
+                                                       namespaceEvents = nil
+                                                       continue
+                                               }
+                                               event.Done(nil)
+                                               for resEvent := range namespaceHandler(in, resourceStore, event) {
+                                                       upserts, deletes := in.Converter.Convert(resEvent)
+                                                       for upsert := range upserts {
+                                                               process("namespace-event", "upsert", upsert.GetKeyName(), func() error { return store.UpsertKey(ctx, upsert) })
+                                                       }
+                                                       for delete := range deletes {
+                                                               process("namespace-event", "delete", delete.GetKeyName(), func() error { return store.DeleteKey(ctx, delete) })
+                                                       }
+                                               }
                                        }
                                }
+                               logger.Info("Stopping synchronization")
                                return nil
                        },
                ),
@@ -116,3 +215,45 @@ func RegisterSynchronizer[T runtime.Object](in syncParams[T]) {
                ),
        )
 }
+
+// namespaceHandler handles namespace events to resynchronize resources
+// associated with the namespace based on whether it is global or not.
+// Return an iterator of events to be processed.
+func namespaceHandler[T runtime.Object](
+       in syncParams[T], rs resource.Store[T],
+       event resource.Event[*slim_corev1.Namespace]) iter.Seq[resource.Event[T]] {
+       return func(yield func(resource.Event[T]) bool) {
+               if event.Kind == resource.Sync {
+                       return
+               }
+               isGlobal := in.NamespaceManager.IsGlobalNamespaceByObject(event.Object)
+
+               // Sync all entries in the Resource store to reflect the namespace change.
+               objects, err := rs.ByIndex(cache.NamespaceIndex, event.Key.Name)
+               if err != nil {
+                       in.Logger.Warn("Failed to list resources for namespace update",
+                               logfields.Error, err,
+                       )
+                       return
+               }
+               for _, obj := range objects {
+                       resEvent := resource.Event[T]{
+                               Key:    resource.NewKey(obj),
+                               Object: obj,
+                       }
+                       // Determine the event kind. If namespace is being deleted,
+                       // all associated resources should be deleted.
+                       // If namespace is upserted/updated and is global,
+                       // resources should be upserted. Otherwise, they should be deleted
+                       // (ex: annotated non-global from global).
+                       if event.Kind == resource.Upsert && isGlobal {
+                               resEvent.Kind = resource.Upsert
+                       } else {
+                               resEvent.Kind = resource.Delete
+                       }
+                       if !yield(resEvent) {
+                               return
+                       }
+               }
+       }
+}