This adds a new flag only-masquerade-default-pool which disables masquerading for pods which have IPs allocated non-default pools.
A per-pool annotation (ipam.cilium.io/skip-masquerade) is also added to configure this behaviour in a more granular way.
This can be particularly useful when using the newly supported combination of multi-pool ipam and tunneling, where you may have the default pool be tunnelled, and additional pools handled natively (for example in conjunction with bgp control plane)
This feature is supported with a new stateDB table for CiliumPodIPPools. At IP allocation time, the chosen pool is looked up on stateDB to look for the skip-masquerade annotation.
Signed-off-by: alimehrabikoshki <alimehrabi97@gmail.com>
--node-port-bind-protection Reject application bind(2) requests to service ports in the NodePort range (default true)
--node-port-range strings Set the min/max NodePort port range (default [30000,32767])
--nodeport-addresses strings A whitelist of CIDRs to limit which IPs are used for NodePort. If not set, primary IPv4 and/or IPv6 address of each native device is used.
+ --only-masquerade-default-pool When using multi-pool IPAM, only masquerade flows from the default IP pool. This will preserve source IPs for pods from non-default IP pools. Useful when combining multi-pool IPAM with BGP control plane. This option must be combined with enable-bpf-masquerade.
--policy-accounting Enable policy accounting (default true)
--policy-audit-mode Enable policy audit (non-drop) mode
--policy-cidr-match-mode strings The entities that can be selected by CIDR policy. Supported values: 'nodes'
--node-encryption-opt-out-labels string Label selector for nodes which will opt-out of node-to-node encryption (default "node-role.kubernetes.io/control-plane")
--node-port-range strings Set the min/max NodePort port range (default [30000,32767])
--nodeport-addresses strings A whitelist of CIDRs to limit which IPs are used for NodePort. If not set, primary IPv4 and/or IPv6 address of each native device is used.
+ --only-masquerade-default-pool When using multi-pool IPAM, only masquerade flows from the default IP pool. This will preserve source IPs for pods from non-default IP pools. Useful when combining multi-pool IPAM with BGP control plane. This option must be combined with enable-bpf-masquerade.
--policy-default-local-cluster Control whether policy rules assume by default the local cluster if not explicitly selected (default true)
--policy-queue-size uint Size of queue for policy-related events (default 100)
--policy-secrets-namespace string PolicySecretsNamesapce is the namespace having secrets used in CNP and CCNP
--node-encryption-opt-out-labels string Label selector for nodes which will opt-out of node-to-node encryption (default "node-role.kubernetes.io/control-plane")
--node-port-range strings Set the min/max NodePort port range (default [30000,32767])
--nodeport-addresses strings A whitelist of CIDRs to limit which IPs are used for NodePort. If not set, primary IPv4 and/or IPv6 address of each native device is used.
+ --only-masquerade-default-pool When using multi-pool IPAM, only masquerade flows from the default IP pool. This will preserve source IPs for pods from non-default IP pools. Useful when combining multi-pool IPAM with BGP control plane. This option must be combined with enable-bpf-masquerade.
--policy-default-local-cluster Control whether policy rules assume by default the local cluster if not explicitly selected (default true)
--policy-queue-size uint Size of queue for policy-related events (default 100)
--policy-secrets-namespace string PolicySecretsNamesapce is the namespace having secrets used in CNP and CCNP
the ``autoDirectNodeRoutes`` Helm option can be used to enable automatic routing
between nodes on a L2 network.
+Masquerade Behaviour
+--------------------
+
+When combining multi-pool IPAM and BGP control plane, you may find it useful to not masquerade
+connections from such pools. As Pod IPs are advertised via BGP to your underlay network and
+return traffic can find its way back, it may not be desirable for the pod source IP to be
+masqueraded as the IP of the node it is on.
+
+It is not always possible to identify pods that should not be masqueraded with just destination
+IPs (via ``--ipvX-native-routing-cidr`` flag or ``ip-masq-agent`` rules) as there might be overlap
+between masqueraded and non-masqueraded pod destination IPs. In such cases, you can exclude IP
+pools from masquerading when eBPF-based masquerading is enabled, by using the flag
+``--only-masquerade-default-pool`` which disables masquerading for all non-default pools.
+Alternatively, you may configure this on a per-pool basis by annotating the CiliumPodIPPool
+resource with ``ipam.cilium.io/skip-masquerade="true"``.
+
+Using the flag or the annotation results in the source IP of your pods being preserved when they
+connect to endpoints outside the cluster, allowing them to be differentiated from pods in other
+pools on your underlay network. The pods can then match firewall or NAT rules on your network
+infrastructure.
+
+Changing either the flag or the annotation after a pod has been allocated an IP will not change
+masquerade behaviour for that pod until it has re-scheduled.
+
.. _ipam_crd_multi_pool_limitations:
Limitations
// MAC of master interface if address is a slave/secondary of a master interface
MasterMac string `json:"master-mac,omitempty"`
+
+ // SkipMasquerade indicates whether the datapath should avoid masquerading connections from this IP.
+ //
+ SkipMasquerade bool `json:"skip-masquerade,omitempty"`
}
// Validate validates this IP a m address response
type: string
description: |
InterfaceNumber is a field for generically identifying an interface. This is only useful in ENI mode.
+ skip-masquerade:
+ type: boolean
+ description: |
+ SkipMasquerade indicates whether the datapath should avoid masquerading connections from this IP.
AddressPair:
description: Addressing information of an endpoint
type: object
"master-mac": {
"description": "MAC of master interface if address is a slave/secondary of a master interface",
"type": "string"
+ },
+ "skip-masquerade": {
+ "description": "SkipMasquerade indicates whether the datapath should avoid masquerading connections from this IP.\n",
+ "type": "boolean"
}
}
},
"master-mac": {
"description": "MAC of master interface if address is a slave/secondary of a master interface",
"type": "string"
+ },
+ "skip-masquerade": {
+ "description": "SkipMasquerade indicates whether the datapath should avoid masquerading connections from this IP.\n",
+ "type": "boolean"
}
}
},
#define ENDPOINT_F_HOST 1 /* Special endpoint representing local host */
#define ENDPOINT_F_ATHOSTNS 2 /* Endpoint located at the host networking namespace */
+#define ENDPOINT_F_NO_SNAT_V4 4 /* Endpoint should not be masqueraded for IPv4 */
+#define ENDPOINT_F_NO_SNAT_V6 8 /* Endpoint should not be masqueraded for IPv6 */
#define ENDPOINT_MASK_HOST_DELIVERY (ENDPOINT_F_HOST | ENDPOINT_F_ATHOSTNS)
+#define ENDPOINT_MASK_SKIP_MASQ_V4 (ENDPOINT_F_HOST | ENDPOINT_F_NO_SNAT_V4)
+#define ENDPOINT_MASK_SKIP_MASQ_V6 (ENDPOINT_F_HOST | ENDPOINT_F_NO_SNAT_V6)
/* Value of endpoint map */
struct endpoint_info {
return NAT_PUNT_TO_STACK;
#endif
- /* if this is a localhost endpoint, no SNAT is needed */
- if (local_ep && (local_ep->flags & ENDPOINT_F_HOST))
+ /* Do not SNAT if this is a localhost endpoint or
+ * endpoint explicitly disallows it (normally multi-pool IPAM endpoints)
+ */
+ if (local_ep && (local_ep->flags & ENDPOINT_MASK_SKIP_MASQ_V4))
return NAT_PUNT_TO_STACK;
/* Do not SNAT if dst belongs to any ip-masq-agent subnet. */
}
# endif /* IPV6_SNAT_EXCLUSION_DST_CIDR */
- if (local_ep && (local_ep->flags & ENDPOINT_F_HOST))
+ /* Do not SNAT if this is a localhost endpoint or
+ * endpoint explicitly disallows it (normally multi-pool IPAM endpoints)
+ */
+ /*if (local_ep && (local_ep->flags & ENDPOINT_MASK_SKIP_MASQ_V6))*/
+ if (local_ep && (local_ep->flags & ENDPOINT_MASK_SKIP_MASQ_V6))
return NAT_PUNT_TO_STACK;
#ifdef ENABLE_IP_MASQ_AGENT_IPV6
test_finish();
}
+
+/* NO_SNAT endpoints should not be masqueraded */
+PKTGEN("tc", "host_bpf_masq_v4_5_no_snat_ep_udp")
+int host_bpf_masq_v4_5_no_snat_ep_udp_pktgen(struct __ctx_buff *ctx)
+{
+ struct pktgen builder;
+ struct udphdr *udp;
+
+ pktgen__init(&builder, ctx);
+
+ /* generate packet from local endpoint to world */
+ udp = pktgen__push_ipv4_udp_packet(&builder,
+ (__u8 *)node_mac, (__u8 *)server_mac,
+ v4_pod_one, SERVER_IP,
+ bpf_htons(12345), SERVER_PORT);
+ if (!udp)
+ return TEST_ERROR;
+
+ pktgen__finish(&builder);
+ return 0;
+}
+
+SETUP("tc", "host_bpf_masq_v4_5_no_snat_ep_udp")
+int host_bpf_masq_v4_5_no_snat_ep_udp_setup(struct __ctx_buff *ctx)
+{
+ /* mark the source IP as a local endpoint with NO_SNAT flag */
+ endpoint_v4_add_entry(v4_pod_one, 0, 0, ENDPOINT_F_NO_SNAT_V4, 0, 0, NULL, NULL);
+
+ return netdev_send_packet(ctx);
+}
+
+CHECK("tc", "host_bpf_masq_v4_5_no_snat_ep_udp")
+int host_bpf_masq_v4_5_no_snat_ep_udp_check(const struct __ctx_buff *ctx)
+{
+ void *data, *data_end;
+ __u32 *status_code;
+
+ test_init();
+
+ data = (void *)(long)ctx_data(ctx);
+ data_end = (void *)(long)ctx->data_end;
+ if (data + sizeof(__u32) > data_end)
+ test_fatal("status code out of bounds");
+ status_code = data;
+ assert(*status_code == CTX_ACT_OK);
+
+ /* traverse data to find the packet */
+ data += sizeof(__u32);
+ if (data + sizeof(struct ethhdr) > data_end)
+ test_fatal("ctx doesn't fit ethhdr");
+ data += sizeof(struct ethhdr);
+ if (data + sizeof(struct iphdr) > data_end)
+ test_fatal("ctx doesn't fit iphdr");
+ struct iphdr *ip4 = data;
+ /* source IP should stay the same */
+ assert(ip4->saddr == v4_pod_one);
+ test_finish();
+}
+
+PKTGEN("tc", "host_bpf_masq_v6_5_no_snat_ep_udp")
+int host_bpf_masq_v6_5_no_snat_ep_udp_pktgen(struct __ctx_buff *ctx)
+{
+ struct pktgen builder;
+ struct udphdr *udp;
+
+ pktgen__init(&builder, ctx);
+
+ /* generate packet from local endpoint to world */
+ udp = pktgen__push_ipv6_udp_packet(&builder,
+ (__u8 *)node_mac, (__u8 *)server_mac,
+ (__u8 *)v6_pod_one, (__u8 *)SERVER_IP_V6,
+ bpf_htons(12345), SERVER_PORT);
+ if (!udp)
+ return TEST_ERROR;
+
+ pktgen__finish(&builder);
+ return 0;
+}
+
+SETUP("tc", "host_bpf_masq_v6_5_no_snat_ep_udp")
+int host_bpf_masq_v6_5_no_snat_ep_udp_setup(struct __ctx_buff *ctx)
+{
+ /* mark the source IP as a local endpoint with NO_SNAT flag */
+ endpoint_v6_add_entry((union v6addr *)v6_pod_one,
+ 0,
+ 0,
+ ENDPOINT_F_NO_SNAT_V6,
+ 0,
+ NULL,
+ NULL);
+
+ return netdev_send_packet(ctx);
+}
+
+CHECK("tc", "host_bpf_masq_v6_5_no_snat_ep_udp")
+int host_bpf_masq_v6_5_no_snat_ep_udp_check(const struct __ctx_buff *ctx)
+{
+ void *data, *data_end;
+ __u32 *status_code;
+
+ test_init();
+
+ data = (void *)(long)ctx_data(ctx);
+ data_end = (void *)(long)ctx->data_end;
+ if (data + sizeof(__u32) > data_end)
+ test_fatal("status code out of bounds");
+ status_code = data;
+ assert(*status_code == CTX_ACT_OK);
+
+ data += sizeof(__u32);
+ if (data + sizeof(struct ethhdr) > data_end)
+ test_fatal("ctx doesn't fit ethhdr");
+ data += sizeof(struct ethhdr);
+ if (data + sizeof(struct ipv6hdr) > data_end)
+ test_fatal("ctx doesn't fit ipv6hdr");
+ /* source IP should stay the same */
+ struct ipv6hdr *ip6 = data;
+ {
+ const __u8 expected_v6_pod_one[16] = v6_pod_one_addr;
+
+ assert(!memcmp(&ip6->saddr, expected_v6_pod_one, 16));
+ }
+
+ test_finish();
+}
+
+/* Regular endpoints (without NO_SNAT) should be masqueraded */
+PKTGEN("tc", "host_bpf_masq_v4_6_snat_ep_udp")
+int host_bpf_masq_v4_6_snat_ep_udp_pktgen(struct __ctx_buff *ctx)
+{
+ struct pktgen builder;
+ struct udphdr *udp;
+
+ pktgen__init(&builder, ctx);
+
+ /* generate packet from local endpoint to world */
+ udp = pktgen__push_ipv4_udp_packet(&builder,
+ (__u8 *)node_mac, (__u8 *)server_mac,
+ v4_pod_one, SERVER_IP,
+ bpf_htons(12345), SERVER_PORT);
+ if (!udp)
+ return TEST_ERROR;
+
+ pktgen__finish(&builder);
+ return 0;
+}
+
+SETUP("tc", "host_bpf_masq_v4_6_snat_ep_udp")
+int host_bpf_masq_v4_6_snat_ep_udp_setup(struct __ctx_buff *ctx)
+{
+ /* mark the source IP as a local endpoint without NO_SNAT flag */
+ endpoint_v4_add_entry(v4_pod_one, 0, 0, 0, 0, 0, NULL, NULL);
+
+ return netdev_send_packet(ctx);
+}
+
+CHECK("tc", "host_bpf_masq_v4_6_snat_ep_udp")
+int host_bpf_masq_v4_6_snat_ep_udp_check(const struct __ctx_buff *ctx)
+{
+ void *data, *data_end;
+ __u32 *status_code;
+
+ test_init();
+
+ data = (void *)(long)ctx_data(ctx);
+ data_end = (void *)(long)ctx->data_end;
+ if (data + sizeof(__u32) > data_end)
+ test_fatal("status code out of bounds");
+ status_code = data;
+ assert(*status_code == CTX_ACT_OK);
+
+ data += sizeof(__u32);
+ if (data + sizeof(struct ethhdr) > data_end)
+ test_fatal("ctx doesn't fit ethhdr");
+ data += sizeof(struct ethhdr);
+ if (data + sizeof(struct iphdr) > data_end)
+ test_fatal("ctx doesn't fit iphdr");
+ struct iphdr *ip4 = data;
+ /* source IP should be masqueraded to NODE_IP */
+ assert(ip4->saddr == NODE_IP);
+ test_finish();
+}
+
+PKTGEN("tc", "host_bpf_masq_v6_6_snat_ep_udp")
+int host_bpf_masq_v6_6_snat_ep_udp_pktgen(struct __ctx_buff *ctx)
+{
+ struct pktgen builder;
+ struct udphdr *udp;
+
+ pktgen__init(&builder, ctx);
+
+ /* generate packet from local endpoint to world */
+ udp = pktgen__push_ipv6_udp_packet(&builder,
+ (__u8 *)node_mac, (__u8 *)server_mac,
+ (__u8 *)v6_pod_one, (__u8 *)SERVER_IP_V6,
+ bpf_htons(12345), SERVER_PORT);
+ if (!udp)
+ return TEST_ERROR;
+
+ pktgen__finish(&builder);
+ return 0;
+}
+
+SETUP("tc", "host_bpf_masq_v6_6_snat_ep_udp")
+int host_bpf_masq_v6_6_snat_ep_udp_setup(struct __ctx_buff *ctx)
+{
+ /* mark the source IP as a local endpoint without NO_SNAT flag */
+ endpoint_v6_add_entry((union v6addr *)v6_pod_one, 0, 0, 0, 0, NULL, NULL);
+
+ return netdev_send_packet(ctx);
+}
+
+CHECK("tc", "host_bpf_masq_v6_6_snat_ep_udp")
+int host_bpf_masq_v6_6_snat_ep_udp_check(const struct __ctx_buff *ctx)
+{
+ void *data, *data_end;
+ __u32 *status_code;
+
+ test_init();
+
+ data = (void *)(long)ctx_data(ctx);
+ data_end = (void *)(long)ctx->data_end;
+ if (data + sizeof(__u32) > data_end)
+ test_fatal("status code out of bounds");
+ status_code = data;
+ assert(*status_code == CTX_ACT_OK);
+
+ data += sizeof(__u32);
+ if (data + sizeof(struct ethhdr) > data_end)
+ test_fatal("ctx doesn't fit ethhdr");
+ data += sizeof(struct ethhdr);
+ if (data + sizeof(struct ipv6hdr) > data_end)
+ test_fatal("ctx doesn't fit ipv6hdr");
+ /* source IP should be masqueraded to NODE_IP_V6 */
+ struct ipv6hdr *ip6 = data;
+ {
+ const __u8 expected_node_ip_v6[16] = v6_node_one_addr;
+
+ assert(!memcmp(&ip6->saddr, expected_node_ip_v6, 16));
+ }
+
+ test_finish();
+}
// default pool when no pool selectors match. Can be set on pods or namespaces.
IPAMRequirePoolMatch = IPAMPrefix + "/require-pool-match"
+ // IPAMSkipMasquerade indicates whether the datapath should avoid masquerading
+ // connections from this IP pool when the cluster is in tunneling mode.
+ IPAMSkipMasquerade = IPAMPrefix + "/skip-masquerade"
+
LBIPAMIPsKey = LBIPAMPrefix + "/ips"
LBIPAMIPKeyAlias = Prefix + "/lb-ipam-ips"
return ep.atHostNS
}
+func (ep *epInfoCache) SkipMasqueradeV4() bool {
+ return ep.isProperty(PropertySkipMasqueradeV4)
+}
+
+func (ep *epInfoCache) SkipMasqueradeV6() bool {
+ return ep.isProperty(PropertySkipMasqueradeV6)
+}
+
+// isProperty checks if the value of the properties map is set, it's a boolean
+// and its value is 'true'.
+func (ep *epInfoCache) isProperty(propertyKey string) bool {
+ if v, ok := ep.properties[propertyKey]; ok {
+ isSet, ok := v.(bool)
+ return ok && isSet
+ }
+ return false
+}
+
func (ep *epInfoCache) GetPropertyValue(key string) any {
return ep.properties[key]
}
// PropertyCEPName contains the CEP name for this endpoint.
PropertyCEPName = "property-cep-name"
+
+ // PropertySkipMasqueradeV4 will mark the endpoint to skip IPv4 masquerade.
+ PropertySkipMasqueradeV4 = "property-skip-masquerade-v4"
+ // PropertySkipMasqueradeV6 will mark the endpoint to skip IPv6 masquerade.
+ PropertySkipMasqueradeV6 = "property-skip-masquerade-v6"
)
var (
return e.isHost
}
+func (e *Endpoint) SkipMasqueradeV4() bool {
+ return e.isProperty(PropertySkipMasqueradeV4)
+}
+
+func (e *Endpoint) SkipMasqueradeV6() bool {
+ return e.isProperty(PropertySkipMasqueradeV6)
+}
+
// SetIsHost is a convenient method to create host endpoints for testing.
func (ep *Endpoint) SetIsHost(isHost bool) {
ep.isHost = isHost
func TestAllocatedIPDump(t *testing.T) {
fakeAddressing := fakeTypes.NewNodeAddressing()
localNodeStore := node.NewTestLocalNodeStore(node.LocalNode{})
- ipam := NewIPAM(hivetest.Logger(t), fakeAddressing, testConfiguration, &ownerMock{}, localNodeStore, &ownerMock{}, &resourceMock{}, &mtuMock, nil, nil, nil, nil)
+ ipam := NewIPAM(NewIPAMParams{
+ Logger: hivetest.Logger(t),
+ NodeAddressing: fakeAddressing,
+ AgentConfig: testConfiguration,
+ NodeDiscovery: &ownerMock{},
+ LocalNodeStore: localNodeStore,
+ K8sEventReg: &ownerMock{},
+ NodeResource: &resourceMock{},
+ MTUConfig: &mtuMock,
+ })
ipam.ConfigureAllocator()
allocv4, allocv6, status := ipam.Dump()
fakeAddressing := fakeTypes.NewNodeAddressing()
localNodeStore := node.NewTestLocalNodeStore(node.LocalNode{})
- ipam := NewIPAM(hivetest.Logger(t), fakeAddressing, testConfiguration, &ownerMock{}, localNodeStore, &ownerMock{}, &resourceMock{}, &mtuMock, nil, nil, nil, nil)
+ ipam := NewIPAM(NewIPAMParams{
+ Logger: hivetest.Logger(t),
+ NodeAddressing: fakeAddressing,
+ AgentConfig: testConfiguration,
+ NodeDiscovery: &ownerMock{},
+ LocalNodeStore: localNodeStore,
+ K8sEventReg: &ownerMock{},
+ NodeResource: &resourceMock{},
+ MTUConfig: &mtuMock,
+ })
ipam.ConfigureAllocator()
err := ipam.AllocateIP(ip, "foo", PoolDefault())
fakeAddressing := fakeTypes.NewNodeAddressing()
localNodeStore := node.NewTestLocalNodeStore(node.LocalNode{})
fakeMetadata := fakeMetadataFunc(func(owner string, family Family) (pool string, err error) { return "some-pool", nil })
- ipam := NewIPAM(hivetest.Logger(t), fakeAddressing, testConfiguration, &ownerMock{}, localNodeStore, &ownerMock{}, &resourceMock{}, &mtuMock, nil, fakeMetadata, nil, nil)
+ ipam := NewIPAM(NewIPAMParams{
+ Logger: hivetest.Logger(t),
+ NodeAddressing: fakeAddressing,
+ AgentConfig: testConfiguration,
+ NodeDiscovery: &ownerMock{},
+ LocalNodeStore: localNodeStore,
+ K8sEventReg: &ownerMock{},
+ NodeResource: &resourceMock{},
+ MTUConfig: &mtuMock,
+ Metadata: fakeMetadata,
+ })
ipam.ConfigureAllocator()
// Allocate IPs and test expiration timer. 'pool' is empty in order to test
Gateway: ipv4Result.GatewayIP,
ExpirationUUID: ipv4Result.ExpirationUUID,
InterfaceNumber: ipv4Result.InterfaceNumber,
+ SkipMasquerade: ipv4Result.SkipMasquerade,
}
}
Gateway: ipv6Result.GatewayIP,
ExpirationUUID: ipv6Result.ExpirationUUID,
InterfaceNumber: ipv6Result.InterfaceNumber,
+ SkipMasquerade: ipv6Result.SkipMasquerade,
}
}
package ipamcell
import (
+ "fmt"
"log/slog"
"github.com/cilium/hive/cell"
+ "github.com/cilium/statedb"
+ "github.com/spf13/pflag"
ipamrestapi "github.com/cilium/cilium/api/v1/server/restapi/ipam"
"github.com/cilium/cilium/daemon/k8s"
"github.com/cilium/cilium/pkg/ipam"
ipamapi "github.com/cilium/cilium/pkg/ipam/api"
ipamMetadata "github.com/cilium/cilium/pkg/ipam/metadata"
+ "github.com/cilium/cilium/pkg/ipam/podippool"
"github.com/cilium/cilium/pkg/ipmasq"
k8sResources "github.com/cilium/cilium/pkg/k8s"
k8sClient "github.com/cilium/cilium/pkg/k8s/client"
"ipam",
"IP Address Management",
+ cell.Config(defaultIPAMConfig),
+
cell.Provide(newIPAddressManager),
cell.Provide(newIPAMAPIHandler),
cell.Provide(k8sResources.CiliumPodIPPoolResource),
+ podippool.TableCell,
// IPAM metadata manager, determines which IPAM pool a pod should allocate from
ipamMetadata.Cell,
)
+type ipamConfig struct {
+ OnlyMasqueradeDefaultPool bool
+}
+
+var defaultIPAMConfig = ipamConfig{
+ OnlyMasqueradeDefaultPool: false,
+}
+
+func (def ipamConfig) Flags(flags *pflag.FlagSet) {
+ flags.Bool("only-masquerade-default-pool",
+ defaultIPAMConfig.OnlyMasqueradeDefaultPool,
+ "When using multi-pool IPAM, only masquerade flows from the default IP pool. "+
+ "This will preserve source IPs for pods from non-default IP pools. "+
+ "Useful when combining multi-pool IPAM with BGP control plane. "+
+ "This option must be combined with enable-bpf-masquerade.")
+}
+
type ipamParams struct {
cell.In
Sysctl sysctl.Sysctl
EndpointManager endpointmanager.EndpointManager
IPMasqAgent *ipmasq.IPMasqAgent
+
+ DB *statedb.DB
+ PodIPPools statedb.Table[podippool.LocalPodIPPool]
}
-func newIPAddressManager(params ipamParams) *ipam.IPAM {
- ipam := ipam.NewIPAM(params.Logger, params.NodeAddressing, params.AgentConfig, params.NodeDiscovery, params.LocalNodeStore, params.K8sEventReporter, params.NodeResource, params.MTU, params.Clientset, params.IPAMMetadataManager, params.Sysctl, params.IPMasqAgent)
+func newIPAddressManager(params ipamParams, c ipamConfig) (*ipam.IPAM, error) {
+ if c.OnlyMasqueradeDefaultPool && !params.AgentConfig.EnableBPFMasquerade {
+ return nil, fmt.Errorf("--only-masquerade-default-pool requires --enable-bpf-masquerade to be enabled")
+ }
+ ipam := ipam.NewIPAM(ipam.NewIPAMParams{
+ Logger: params.Logger,
+ NodeAddressing: params.NodeAddressing,
+ AgentConfig: params.AgentConfig,
+ NodeDiscovery: params.NodeDiscovery,
+ LocalNodeStore: params.LocalNodeStore,
+ K8sEventReg: params.K8sEventReporter,
+ NodeResource: params.NodeResource,
+ MTUConfig: params.MTU,
+ Clientset: params.Clientset,
+ Metadata: params.IPAMMetadataManager,
+ Sysctl: params.Sysctl,
+ IPMasqAgent: params.IPMasqAgent,
+ DB: params.DB,
+ PodIPPools: params.PodIPPools,
+ OnlyMasqueradeDefaultPool: c.OnlyMasqueradeDefaultPool,
+ })
debug.RegisterStatusObject("ipam", ipam)
params.EndpointManager.Subscribe(ipam)
- return ipam
+ return ipam, nil
}
type ipamAPIHandlerParams struct {
sharedNodeStore.ownNode = cn
localNodeStore := node.NewTestLocalNodeStore(node.LocalNode{})
- ipam := NewIPAM(hivetest.Logger(t), fakeAddressing, conf, &ownerMock{}, localNodeStore, &ownerMock{}, &resourceMock{}, &mtuMock, nil, nil, nil, nil)
+ ipam := NewIPAM(NewIPAMParams{
+ Logger: hivetest.Logger(t),
+ NodeAddressing: fakeAddressing,
+ AgentConfig: conf,
+ NodeDiscovery: &ownerMock{},
+ LocalNodeStore: localNodeStore,
+ K8sEventReg: &ownerMock{},
+ NodeResource: &resourceMock{},
+ MTUConfig: &mtuMock,
+ })
ipam.ConfigureAllocator()
sharedNodeStore.updateLocalNodeResource(cn)
sharedNodeStore.ownNode = cn
localNodeStore := node.NewTestLocalNodeStore(node.LocalNode{})
- ipam := NewIPAM(hivetest.Logger(t), fakeAddressing, conf, &ownerMock{}, localNodeStore, &ownerMock{}, &resourceMock{}, &mtuMock, nil, nil, nil, ipMasqAgent)
+ ipam := NewIPAM(NewIPAMParams{
+ Logger: hivetest.Logger(t),
+ NodeAddressing: fakeAddressing,
+ AgentConfig: conf,
+ NodeDiscovery: &ownerMock{},
+ LocalNodeStore: localNodeStore,
+ K8sEventReg: &ownerMock{},
+ NodeResource: &resourceMock{},
+ MTUConfig: &mtuMock,
+ IPMasqAgent: ipMasqAgent,
+ })
ipam.ConfigureAllocator()
epipv4 := netip.MustParseAddr("10.1.1.226")
sharedNodeStore.ownNode = cn
localNodeStore := node.NewTestLocalNodeStore(node.LocalNode{})
- ipam := NewIPAM(hivetest.Logger(t), fakeAddressing, conf, &ownerMock{}, localNodeStore, &ownerMock{}, &resourceMock{}, &mtuMock, nil, nil, nil, ipMasqAgent)
+ ipam := NewIPAM(NewIPAMParams{
+ Logger: hivetest.Logger(t),
+ NodeAddressing: fakeAddressing,
+ AgentConfig: conf,
+ NodeDiscovery: &ownerMock{},
+ LocalNodeStore: localNodeStore,
+ K8sEventReg: &ownerMock{},
+ NodeResource: &resourceMock{},
+ MTUConfig: &mtuMock,
+ IPMasqAgent: ipMasqAgent,
+ })
ipam.ConfigureAllocator()
epipv4 := netip.MustParseAddr("10.10.1.5")
"log/slog"
"net"
+ "github.com/cilium/statedb"
+
agentK8s "github.com/cilium/cilium/daemon/k8s"
"github.com/cilium/cilium/pkg/datapath/linux/sysctl"
"github.com/cilium/cilium/pkg/datapath/types"
ipamOption "github.com/cilium/cilium/pkg/ipam/option"
+ "github.com/cilium/cilium/pkg/ipam/podippool"
"github.com/cilium/cilium/pkg/ipmasq"
"github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/logging"
GetIPPoolForPod(owner string, family Family) (pool string, err error)
}
+// NewIPAMParams contains the parameters for creating a new IPAM instance.
+type NewIPAMParams struct {
+ Logger *slog.Logger
+ NodeAddressing types.NodeAddressing
+ AgentConfig *option.DaemonConfig
+ NodeDiscovery Owner
+ LocalNodeStore *node.LocalNodeStore
+ K8sEventReg K8sEventRegister
+ NodeResource agentK8s.LocalCiliumNodeResource
+ MTUConfig MtuConfiguration
+ Clientset client.Clientset
+ Metadata Metadata
+ Sysctl sysctl.Sysctl
+ IPMasqAgent *ipmasq.IPMasqAgent
+
+ DB *statedb.DB
+ PodIPPools statedb.Table[podippool.LocalPodIPPool]
+ OnlyMasqueradeDefaultPool bool
+}
+
// NewIPAM returns a new IP address manager
-func NewIPAM(logger *slog.Logger, nodeAddressing types.NodeAddressing, c *option.DaemonConfig, nodeDiscovery Owner, localNodeStore *node.LocalNodeStore, k8sEventReg K8sEventRegister, node agentK8s.LocalCiliumNodeResource, mtuConfig MtuConfiguration, clientset client.Clientset, metadata Metadata, sysctl sysctl.Sysctl, ipMasqAgent *ipmasq.IPMasqAgent) *IPAM {
+func NewIPAM(params NewIPAMParams) *IPAM {
return &IPAM{
- logger: logger,
- nodeAddressing: nodeAddressing,
- config: c,
- owner: map[Pool]map[string]string{},
- expirationTimers: map[timerKey]expirationTimer{},
- excludedIPs: map[string]string{},
-
- k8sEventReg: k8sEventReg,
- localNodeStore: localNodeStore,
- nodeResource: node,
- mtuConfig: mtuConfig,
- clientset: clientset,
- nodeDiscovery: nodeDiscovery,
- metadata: metadata,
- sysctl: sysctl,
- ipMasqAgent: ipMasqAgent,
+ logger: params.Logger,
+ config: params.AgentConfig,
+ nodeAddressing: params.NodeAddressing,
+ owner: map[Pool]map[string]string{},
+ expirationTimers: map[timerKey]expirationTimer{},
+ excludedIPs: map[string]string{},
+ k8sEventReg: params.K8sEventReg,
+ localNodeStore: params.LocalNodeStore,
+ nodeResource: params.NodeResource,
+ mtuConfig: params.MTUConfig,
+ clientset: params.Clientset,
+ nodeDiscovery: params.NodeDiscovery,
+ metadata: params.Metadata,
+ sysctl: params.Sysctl,
+ ipMasqAgent: params.IPMasqAgent,
+ db: params.DB,
+ podIPPools: params.PodIPPools,
+ onlyMasqueradeDefaultPool: params.OnlyMasqueradeDefaultPool,
}
}
}
case ipamOption.IPAMMultiPool:
ipam.logger.Info("Initializing MultiPool IPAM")
- manager := newMultiPoolManager(ipam.logger, ipam.config, ipam.nodeResource, ipam.nodeDiscovery, ipam.localNodeStore, ipam.clientset.CiliumV2().CiliumNodes())
+ manager := newMultiPoolManager(MultiPoolManagerParams{
+ Logger: ipam.logger,
+ Conf: ipam.config,
+ Node: ipam.nodeResource,
+ Owner: ipam.nodeDiscovery,
+ LocalNodeStore: ipam.localNodeStore,
+ Clientset: ipam.clientset.CiliumV2().CiliumNodes(),
+ DB: ipam.db,
+ PodIPPools: ipam.podIPPools,
+ OnlyMasqueradeDefaultPool: ipam.onlyMasqueradeDefaultPool,
+ })
if ipam.config.IPv6Enabled() {
ipam.IPv6Allocator = manager.Allocator(IPv6)
func TestLock(t *testing.T) {
fakeAddressing := fakeTypes.NewNodeAddressing()
localNodeStore := node.NewTestLocalNodeStore(node.LocalNode{})
- ipam := NewIPAM(hivetest.Logger(t), fakeAddressing, testConfiguration, &ownerMock{}, localNodeStore, &ownerMock{}, &resourceMock{}, &mtuMock, nil, nil, nil, nil)
+ ipam := NewIPAM(NewIPAMParams{
+ Logger: hivetest.Logger(t),
+ NodeAddressing: fakeAddressing,
+ AgentConfig: testConfiguration,
+ NodeDiscovery: &ownerMock{},
+ LocalNodeStore: localNodeStore,
+ K8sEventReg: &ownerMock{},
+ NodeResource: &resourceMock{},
+ MTUConfig: &mtuMock,
+ })
ipam.ConfigureAllocator()
// Since the IPs we have allocated to the endpoints might or might not
func TestExcludeIP(t *testing.T) {
fakeAddressing := fakeTypes.NewNodeAddressing()
localNodeStore := node.NewTestLocalNodeStore(node.LocalNode{})
- ipam := NewIPAM(hivetest.Logger(t), fakeAddressing, testConfiguration, &ownerMock{}, localNodeStore, &ownerMock{}, &resourceMock{}, &mtuMock, nil, nil, nil, nil)
+ ipam := NewIPAM(NewIPAMParams{
+ Logger: hivetest.Logger(t),
+ NodeAddressing: fakeAddressing,
+ AgentConfig: testConfiguration,
+ NodeDiscovery: &ownerMock{},
+ LocalNodeStore: localNodeStore,
+ K8sEventReg: &ownerMock{},
+ NodeResource: &resourceMock{},
+ MTUConfig: &mtuMock,
+ })
ipam.ConfigureAllocator()
ipv4 := fakeIPv4AllocCIDRIP(fakeAddressing)
}
})
- ipam := NewIPAM(hivetest.Logger(t), fakeAddressing, testConfiguration, &ownerMock{}, localNodeStore, &ownerMock{}, &resourceMock{}, &mtuMock, nil, fakeMetadata, nil, nil)
+ ipam := NewIPAM(NewIPAMParams{
+ Logger: hivetest.Logger(t),
+ NodeAddressing: fakeAddressing,
+ AgentConfig: testConfiguration,
+ NodeDiscovery: &ownerMock{},
+ LocalNodeStore: localNodeStore,
+ K8sEventReg: &ownerMock{},
+ NodeResource: &resourceMock{},
+ MTUConfig: &mtuMock,
+ Metadata: fakeMetadata,
+ })
ipam.ConfigureAllocator()
ipam.IPv4Allocator = newFakePoolAllocator(map[string]string{
"default": "10.10.0.0/16",
fakeAddressing := fakeTypes.NewNodeAddressing()
localNodeStore := node.NewTestLocalNodeStore(node.LocalNode{})
fakeMetadata := fakeMetadataFunc(func(owner string, family Family) (pool string, err error) { return "some-pool", nil })
- ipam := NewIPAM(hivetest.Logger(t), fakeAddressing, testConfiguration, &ownerMock{}, localNodeStore, &ownerMock{}, &resourceMock{}, &mtuMock, nil, fakeMetadata, nil, nil)
+ ipam := NewIPAM(NewIPAMParams{
+ Logger: hivetest.Logger(t),
+ NodeAddressing: fakeAddressing,
+ AgentConfig: testConfiguration,
+ NodeDiscovery: &ownerMock{},
+ LocalNodeStore: localNodeStore,
+ K8sEventReg: &ownerMock{},
+ NodeResource: &resourceMock{},
+ MTUConfig: &mtuMock,
+ Metadata: fakeMetadata,
+ })
ipam.ConfigureAllocator()
// AllocateIP requires explicit pool
"strconv"
"sync"
+ "github.com/cilium/statedb"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
agentK8s "github.com/cilium/cilium/daemon/k8s"
+ "github.com/cilium/cilium/pkg/annotation"
"github.com/cilium/cilium/pkg/controller"
+ "github.com/cilium/cilium/pkg/ipam/podippool"
"github.com/cilium/cilium/pkg/ipam/types"
ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/k8s/resource"
UpdateStatus(ctx context.Context, ciliumNode *ciliumv2.CiliumNode, opts metav1.UpdateOptions) (*ciliumv2.CiliumNode, error)
}
+type MultiPoolManagerParams struct {
+ Logger *slog.Logger
+ Conf *option.DaemonConfig
+ Node agentK8s.LocalCiliumNodeResource
+ Owner Owner
+ LocalNodeStore *node.LocalNodeStore
+ Clientset nodeUpdater
+ DB *statedb.DB
+ PodIPPools statedb.Table[podippool.LocalPodIPPool]
+ OnlyMasqueradeDefaultPool bool
+}
+
type multiPoolManager struct {
mutex *lock.Mutex
conf *option.DaemonConfig
finishedRestore map[Family]bool
logger *slog.Logger
+
+ db *statedb.DB
+ podIPPools statedb.Table[podippool.LocalPodIPPool]
+ onlyMasqueradeDefaultPool bool
}
var _ Allocator = (*multiPoolAllocator)(nil)
-func newMultiPoolManager(logger *slog.Logger, conf *option.DaemonConfig, node agentK8s.LocalCiliumNodeResource, owner Owner, localNodeStore *node.LocalNodeStore, clientset nodeUpdater) *multiPoolManager {
- preallocMap, err := parseMultiPoolPreAllocMap(conf.IPAMMultiPoolPreAllocation)
+func newMultiPoolManager(p MultiPoolManagerParams) *multiPoolManager {
+ preallocMap, err := parseMultiPoolPreAllocMap(p.Conf.IPAMMultiPoolPreAllocation)
if err != nil {
- logging.Fatal(logger, fmt.Sprintf("Invalid %s flag value", option.IPAMMultiPoolPreAllocation), logfields.Error, err)
+ logging.Fatal(p.Logger, fmt.Sprintf("Invalid %s flag value", option.IPAMMultiPoolPreAllocation), logfields.Error, err)
}
k8sController := controller.NewManager()
k8sUpdater, err := trigger.NewTrigger(trigger.Parameters{
- MinInterval: conf.IPAMCiliumNodeUpdateRate,
+ MinInterval: p.Conf.IPAMCiliumNodeUpdateRate,
TriggerFunc: func(reasons []string) {
k8sController.TriggerController(multiPoolControllerName)
},
Name: multiPoolTriggerName,
})
if err != nil {
- logging.Fatal(logger, "Unable to initialize CiliumNode synchronization trigger", logfields.Error, err)
+ logging.Fatal(p.Logger, "Unable to initialize CiliumNode synchronization trigger", logfields.Error, err)
}
localNodeSynced := make(chan struct{})
c := &multiPoolManager{
- logger: logger,
+ logger: p.Logger,
mutex: &lock.Mutex{},
- owner: owner,
- conf: conf,
+ owner: p.Owner,
+ conf: p.Conf,
preallocatedIPsPerPool: preallocMap,
- pendingIPsPerPool: newPendingAllocationsPerPool(logger),
+ pendingIPsPerPool: newPendingAllocationsPerPool(p.Logger),
pools: map[Pool]*poolPair{},
poolsUpdated: make(chan struct{}, 1),
node: nil,
controller: k8sController,
k8sUpdater: k8sUpdater,
- localNodeStore: localNodeStore,
- nodeUpdater: clientset,
+ localNodeStore: p.LocalNodeStore,
+ nodeUpdater: p.Clientset,
finishedRestore: map[Family]bool{},
localNodeSynced: localNodeSynced,
localNodeSyncedFn: sync.OnceFunc(func() {
close(localNodeSynced)
}),
+ db: p.DB,
+ podIPPools: p.PodIPPools,
+ onlyMasqueradeDefaultPool: p.OnlyMasqueradeDefaultPool,
}
// We don't have a context to use here (as a lot of IPAM doesn't really
// resource and it's processing if IPAM and other subsytems are being
// stopped, there appears to be no such signal available here. Also, don't
// retry events - the downstream code isn't setup to handle retries.
- evs := node.Events(context.TODO(), resource.WithErrorHandler(resource.RetryUpTo(0)))
+ evs := p.Node.Events(context.TODO(), resource.WithErrorHandler(resource.RetryUpTo(0)))
go c.ciliumNodeEventLoop(evs)
- owner.UpdateCiliumNodeResource()
+ p.Owner.UpdateCiliumNodeResource()
c.waitForAllPools()
case <-c.localNodeSynced:
return c
case <-time.After(5 * time.Second):
- logger.Info("Waiting for local CiliumNode resource to synchronize local node store")
+ p.Logger.Info("Waiting for local CiliumNode resource to synchronize local node store")
}
}
}
func (m *multiPoolManager) waitForPool(ctx context.Context, family Family, poolName Pool) (ready bool) {
for {
m.mutex.Lock()
+ poolReady := false
switch family {
case IPv4:
if p, ok := m.pools[poolName]; ok && p.v4 != nil && p.v4.hasAvailableIPs() {
- m.mutex.Unlock()
- return true
+ poolReady = true
}
case IPv6:
if p, ok := m.pools[poolName]; ok && p.v6 != nil && p.v6.hasAvailableIPs() {
- m.mutex.Unlock()
- return true
+ poolReady = true
}
}
m.mutex.Unlock()
+ // ensure the pool is present in stateDB for checking annotations at allocation time
+ txn := m.db.ReadTxn()
+ _, _, dbWatch, found := m.podIPPools.GetWatch(txn, podippool.ByName(string(poolName)))
+ if poolReady && found {
+ return true
+ }
+
select {
case <-ctx.Done():
return false
case <-m.poolsUpdated:
continue
+ case <-dbWatch:
+ continue
case <-time.After(5 * time.Second):
m.logger.Info(
"Waiting for podCIDR pool to become available",
m.pendingIPsPerPool.upsertPendingAllocation(poolName, owner, family)
return nil, &ErrPoolNotReadyYet{poolName: poolName, family: family}
}
+ skipMasq := false
+ // If the flag is set, skip masquerade for all non-default pools
+ if m.onlyMasqueradeDefaultPool && poolName != PoolDefault() {
+ skipMasq = true
+ } else {
+ // Lookup the IP pool from stateDB and check if it has the explicit annotations
+ txn := m.db.ReadTxn()
+ podIPPool, _, found := m.podIPPools.Get(txn, podippool.ByName(string(poolName)))
+ if !found {
+ m.pendingIPsPerPool.upsertPendingAllocation(poolName, owner, family)
+ return nil, fmt.Errorf("IP pool '%s' not found in stateDB table", string(poolName))
+ }
+ if v, ok := podIPPool.Annotations[annotation.IPAMSkipMasquerade]; ok && v == "true" {
+ skipMasq = true
+ }
+ }
ip, err := pool.allocateNext()
if err != nil {
}
m.pendingIPsPerPool.markAsAllocated(poolName, owner, family)
- return &AllocationResult{IP: ip, IPPoolName: poolName}, nil
+ return &AllocationResult{IP: ip, IPPoolName: poolName, SkipMasquerade: skipMasq}, nil
}
func (m *multiPoolManager) allocateIP(ip net.IP, owner string, poolName Pool, family Family, syncUpstream bool) (*AllocationResult, error) {
"time"
"github.com/cilium/hive/hivetest"
+ "github.com/cilium/statedb"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "github.com/cilium/cilium/pkg/annotation"
"github.com/cilium/cilium/pkg/cidr"
+ "github.com/cilium/cilium/pkg/ipam/podippool"
"github.com/cilium/cilium/pkg/ipam/service/ipallocator"
"github.com/cilium/cilium/pkg/ipam/types"
ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
+ k8sv2alpha1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
"github.com/cilium/cilium/pkg/k8s/resource"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/node"
)
func Test_MultiPoolManager(t *testing.T) {
+ db := statedb.New()
+ poolsTbl, err := podippool.NewTable(db)
+ require.NoError(t, err)
+ insertPool(t, db, poolsTbl, "default", false)
+ insertPool(t, db, poolsTbl, "mars", false)
+ insertPool(t, db, poolsTbl, "jupiter", false)
+
fakeConfig := testConfiguration
// set custom preAllocMap for unit tests
fakeConfig.IPAMMultiPoolPreAllocation = map[string]string{
// waiting for initial local node sync and return
go fakeK8sCiliumNodeAPI.updateNode(currentNode)
- c := newMultiPoolManager(hivetest.Logger(t), fakeConfig, fakeK8sCiliumNodeAPI, fakeOwner, fakeLocalNodeStore, fakeK8sCiliumNodeAPI)
+ c := newMultiPoolManager(MultiPoolManagerParams{
+ Logger: hivetest.Logger(t),
+ Conf: fakeConfig,
+ Node: fakeK8sCiliumNodeAPI,
+ Owner: fakeOwner,
+ LocalNodeStore: fakeLocalNodeStore,
+ Clientset: fakeK8sCiliumNodeAPI,
+ DB: db,
+ PodIPPools: poolsTbl,
+ })
// For testing, we want every trigger to run the controller once
k8sUpdater, err := trigger.NewTrigger(trigger.Parameters{
func Test_MultiPoolManager_ReleaseUnusedCIDR(t *testing.T) {
logger := hivetest.Logger(t)
+ db := statedb.New()
+ poolsTbl, err := podippool.NewTable(db)
+ require.NoError(t, err)
+ insertPool(t, db, poolsTbl, "default", false)
+
fakeConfig := testConfiguration
// disable pre-allocation
fakeConfig.IPAMMultiPoolPreAllocation = map[string]string{}
// Feed initial node to the fake API so that newMultiPoolManager returns immediately
go fakeK8sAPI.updateNode(initialNode)
- mgr := newMultiPoolManager(logger, fakeConfig, fakeK8sAPI, fakeOwner, fakeLocalNodeStore, fakeK8sAPI)
+ mgr := newMultiPoolManager(MultiPoolManagerParams{
+ Logger: logger,
+ Conf: fakeConfig,
+ Node: fakeK8sAPI,
+ Owner: fakeOwner,
+ LocalNodeStore: fakeLocalNodeStore,
+ Clientset: fakeK8sAPI,
+ DB: db,
+ PodIPPools: poolsTbl,
+ })
// Trigger controller immediately when requested by the IPAM trigger
triggerNow, err := trigger.NewTrigger(trigger.Parameters{
func Test_MultiPoolManager_ReleaseUnusedCIDR_PreAlloc(t *testing.T) {
logger := hivetest.Logger(t)
+ db := statedb.New()
+ poolsTbl, err := podippool.NewTable(db)
+ require.NoError(t, err)
+ insertPool(t, db, poolsTbl, "default", false)
+
// preAlloc buffer of 1 for pool "default"
fakeConfig := testConfiguration
fakeConfig.IPAMMultiPoolPreAllocation = map[string]string{
// Feed initial node so that newMultiPoolManager returns immediately
go fakeK8sAPI.updateNode(initialNode)
- mgr := newMultiPoolManager(logger, fakeConfig, fakeK8sAPI, fakeOwner, fakeLocalNodeStore, fakeK8sAPI)
+ mgr := newMultiPoolManager(MultiPoolManagerParams{
+ Logger: logger,
+ Conf: fakeConfig,
+ Node: fakeK8sAPI,
+ Owner: fakeOwner,
+ LocalNodeStore: fakeLocalNodeStore,
+ Clientset: fakeK8sAPI,
+ DB: db,
+ PodIPPools: poolsTbl,
+ })
// Trigger controller immediately when requested
triggerNow, err := trigger.NewTrigger(trigger.Parameters{
return nil
}
+
+func TestAllocateNextFamily_SkipMasquerade(t *testing.T) {
+ db := statedb.New()
+ poolsTbl, err := podippool.NewTable(db)
+ require.NoError(t, err)
+
+ insertPool(t, db, poolsTbl, "default", false)
+ insertPool(t, db, poolsTbl, "blue", false)
+ insertPool(t, db, poolsTbl, "red", true) // skip-masquerade annotation
+ insertPool(t, db, poolsTbl, "green", false)
+
+ // onlyMasqueradeDefaultPool = true, non-default pool
+ mgr := createSkipMasqTestManager(t, db, poolsTbl, true)
+ res, err := mgr.allocateNext("ns/pod", "blue", IPv4, false)
+ require.NoError(t, err)
+ require.True(t, res.SkipMasquerade, "SkipMasquerade should be true for non-default pools when onlyMasqueradeDefaultPool is set")
+
+ // onlyMasqueradeDefaultPool = true, default pool
+ res, err = mgr.allocateNext("ns/pod", "default", IPv4, false)
+ require.NoError(t, err)
+ require.False(t, res.SkipMasquerade, "default pool should always be masqueraded even if global flag set")
+
+ // onlyMasqueradeDefaultPool = false but pool annotated with skip-masquerade
+ mgr = createSkipMasqTestManager(t, db, poolsTbl, false)
+ res, err = mgr.allocateNext("ns/pod", "red", IPv4, false)
+ require.NoError(t, err)
+ require.True(t, res.SkipMasquerade, "SkipMasquerade should be true based on pool annotation")
+
+ // honour annotation on default pool also
+ insertPool(t, db, poolsTbl, "default", true)
+ mgr = createSkipMasqTestManager(t, db, poolsTbl, false)
+ res, err = mgr.allocateNext("ns/pod", "default", IPv4, false)
+ require.NoError(t, err)
+ require.True(t, res.SkipMasquerade, "default pool should not be masqueraded if annotation set")
+
+ // neither flag nor annotation set
+ mgr = createSkipMasqTestManager(t, db, poolsTbl, false)
+ res, err = mgr.allocateNext("ns/pod", "green", IPv4, false)
+ require.NoError(t, err)
+ require.False(t, res.SkipMasquerade, "SkipMasquerade should default to false")
+}
+
+func insertPool(t *testing.T, db *statedb.DB, tbl statedb.RWTable[podippool.LocalPodIPPool], name string, skipMasq bool) {
+ t.Helper()
+ ann := map[string]string{}
+ if skipMasq {
+ ann[annotation.IPAMSkipMasquerade] = "true"
+ }
+
+ poolObj := podippool.LocalPodIPPool{
+ CiliumPodIPPool: &k8sv2alpha1.CiliumPodIPPool{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: name,
+ Annotations: ann,
+ },
+ },
+ UpdatedAt: time.Now(),
+ }
+
+ w := db.WriteTxn(tbl)
+ tbl.Insert(w, poolObj)
+ w.Commit()
+}
+
+func createSkipMasqTestManager(t *testing.T, db *statedb.DB, pools statedb.Table[podippool.LocalPodIPPool], onlyMasqDefault bool) *multiPoolManager {
+ t.Helper()
+
+ fakeConfig := testConfiguration
+ fakeConfig.IPAMMultiPoolPreAllocation = map[string]string{}
+ fakeOwner := &ownerMock{}
+ fakeLocalNodeStore := node.NewTestLocalNodeStore(node.LocalNode{})
+ cnEvents := make(chan resource.Event[*ciliumv2.CiliumNode])
+ fakeK8sAPI := &fakeK8sCiliumNodeAPIResource{
+ c: cnEvents,
+ node: &ciliumv2.CiliumNode{},
+ onUpsertEvent: func(err error) {},
+ onDeleteEvent: func(err error) {},
+ }
+
+ initialNode := &ciliumv2.CiliumNode{
+ ObjectMeta: metav1.ObjectMeta{Name: nodeTypes.GetName()},
+ Spec: ciliumv2.NodeSpec{
+ IPAM: types.IPAMSpec{
+ Pools: types.IPAMPoolSpec{
+ Allocated: []types.IPAMPoolAllocation{
+ {Pool: "default", CIDRs: []types.IPAMPodCIDR{"10.0.0.0/24"}},
+ {Pool: "blue", CIDRs: []types.IPAMPodCIDR{"10.0.1.0/24"}},
+ {Pool: "red", CIDRs: []types.IPAMPodCIDR{"10.0.2.0/24"}},
+ {Pool: "green", CIDRs: []types.IPAMPodCIDR{"10.0.3.0/24"}},
+ },
+ },
+ },
+ },
+ }
+
+ go fakeK8sAPI.updateNode(initialNode)
+
+ mgr := newMultiPoolManager(MultiPoolManagerParams{
+ Logger: hivetest.Logger(t),
+ Conf: fakeConfig,
+ Node: fakeK8sAPI,
+ Owner: fakeOwner,
+ LocalNodeStore: fakeLocalNodeStore,
+ Clientset: fakeK8sAPI,
+ DB: db,
+ PodIPPools: pools,
+ OnlyMasqueradeDefaultPool: onlyMasqDefault,
+ })
+
+ return mgr
+}
--- /dev/null
+// SPDX-License-Identifier: Apache-2.0
+// Copyright Authors of Cilium
+
+package podippool
+
+import (
+ "strconv"
+ "strings"
+
+ "github.com/cilium/hive/cell"
+ "github.com/cilium/hive/job"
+ "github.com/cilium/statedb"
+ "github.com/cilium/statedb/index"
+
+ "github.com/cilium/cilium/pkg/annotation"
+ "github.com/cilium/cilium/pkg/k8s"
+ api_v2alpha1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
+ "github.com/cilium/cilium/pkg/k8s/client"
+ "github.com/cilium/cilium/pkg/k8s/utils"
+ "github.com/cilium/cilium/pkg/time"
+)
+
+// LocalPodIPPool is an internal model of pod IP pools on the cluster
+type LocalPodIPPool struct {
+ *api_v2alpha1.CiliumPodIPPool
+
+ // UpdatedAt is the time when [LocalPodIPPool] was last updated, e.g. it
+ // shows when the pool change was received from the api-server.
+ UpdatedAt time.Time `json:"updatedAt" yaml:"updatedAt"`
+}
+
+func (p LocalPodIPPool) TableHeader() []string {
+ return []string{
+ "Name",
+ "v4CIDRs",
+ "v4MaskSize",
+ "v6CIDRs",
+ "v6MaskSize",
+ "Flags",
+ }
+}
+
+func (p LocalPodIPPool) TableRow() []string {
+ // Handle optional IPv4 specification
+ var v4Cidrs []string
+ var v4MaskSize string
+ if p.Spec.IPv4 != nil {
+ v4Cidrs = make([]string, len(p.Spec.IPv4.CIDRs))
+ for i := range p.Spec.IPv4.CIDRs {
+ v4Cidrs[i] = string(p.Spec.IPv4.CIDRs[i])
+ }
+ v4MaskSize = strconv.FormatUint(uint64(p.Spec.IPv4.MaskSize), 10)
+ } else {
+ v4MaskSize = "-"
+ }
+
+ // Handle optional IPv6 specification
+ var v6Cidrs []string
+ var v6MaskSize string
+ if p.Spec.IPv6 != nil {
+ v6Cidrs = make([]string, len(p.Spec.IPv6.CIDRs))
+ for i := range p.Spec.IPv6.CIDRs {
+ v6Cidrs[i] = string(p.Spec.IPv6.CIDRs[i])
+ }
+ v6MaskSize = strconv.FormatUint(uint64(p.Spec.IPv6.MaskSize), 10)
+ } else {
+ v6MaskSize = "-"
+ }
+
+ flags := []string{}
+ if v, ok := p.ObjectMeta.Annotations[annotation.IPAMSkipMasquerade]; ok && v == "true" {
+ flags = append(flags, "SkipMasquerade=true")
+ }
+
+ return []string{
+ p.Name,
+ formatCIDRs(v4Cidrs),
+ v4MaskSize,
+ formatCIDRs(v6Cidrs),
+ v6MaskSize,
+ strings.Join(flags, ", "),
+ }
+}
+
+func formatCIDRs(cidrs []string) string {
+ if len(cidrs) > 3 {
+ return strings.Join(append(cidrs[:2], "..."), ", ")
+ }
+ return strings.Join(cidrs, ", ")
+}
+
+const (
+ TableName = "podippools"
+ reflectorName = "daemon-k8s"
+)
+
+var (
+ NameIndex = statedb.Index[LocalPodIPPool, string]{
+ Name: "name",
+ FromObject: func(obj LocalPodIPPool) index.KeySet {
+ return index.NewKeySet([]byte(obj.Name))
+ },
+ FromKey: index.String,
+ FromString: index.FromString,
+ Unique: true,
+ }
+
+ ByName = NameIndex.Query
+
+ // TableCell provides the PodIPPool StateDB table and its k8s reflector.
+ TableCell = cell.Module(
+ "ipam-podippool-table",
+ "PodIPPool StateDB Table",
+ cell.Provide(NewTableAndReflector),
+ )
+)
+
+// NewTableAndReflector returns the read-only Table[LocalPodIPPool] and registers
+// the k8s reflector. These are combined to ensure any dependency on Table[LocalPodIPPool]
+// will start after the reflector, ensuring that Start hooks can wait for the table
+// to initialize.
+func NewTableAndReflector(jg job.Group, db *statedb.DB, cs client.Clientset) (statedb.Table[LocalPodIPPool], error) {
+ pools, err := NewTable(db)
+ if err != nil {
+ return nil, err
+ }
+
+ if !cs.IsEnabled() {
+ return pools, nil
+ }
+
+ cfg := reflectorConfig(cs, pools)
+ err = k8s.RegisterReflector(jg, db, cfg)
+ return pools, err
+}
+
+func NewTable(db *statedb.DB) (statedb.RWTable[LocalPodIPPool], error) {
+ return statedb.NewTable(
+ db,
+ TableName,
+ NameIndex,
+ )
+}
+
+func reflectorConfig(cs client.Clientset, podIPPools statedb.RWTable[LocalPodIPPool]) k8s.ReflectorConfig[LocalPodIPPool] {
+ lw := utils.ListerWatcherWithModifiers(
+ utils.ListerWatcherFromTyped(cs.CiliumV2alpha1().CiliumPodIPPools()),
+ )
+ return k8s.ReflectorConfig[LocalPodIPPool]{
+ Name: reflectorName,
+ Table: podIPPools,
+ ListerWatcher: lw,
+ MetricScope: "PodIPPool",
+ Transform: func(_ statedb.ReadTxn, obj any) (LocalPodIPPool, bool) {
+ pool, ok := obj.(*api_v2alpha1.CiliumPodIPPool)
+ if !ok {
+ return LocalPodIPPool{}, false
+ }
+ return LocalPodIPPool{
+ CiliumPodIPPool: pool,
+ UpdatedAt: time.Now(),
+ }, true
+ },
+ }
+}
--- /dev/null
+// SPDX-License-Identifier: Apache-2.0
+// Copyright Authors of Cilium
+
+package podippool
+
+import (
+ "context"
+ "maps"
+ "testing"
+
+ "github.com/cilium/hive/cell"
+ "github.com/cilium/hive/hivetest"
+ "github.com/cilium/hive/script"
+ "github.com/cilium/hive/script/scripttest"
+ "github.com/cilium/statedb"
+ "github.com/spf13/pflag"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/cilium/cilium/pkg/hive"
+ k8sClient "github.com/cilium/cilium/pkg/k8s/client/testutils"
+ nodeTypes "github.com/cilium/cilium/pkg/node/types"
+ "github.com/cilium/cilium/pkg/time"
+)
+
+func TestScript(t *testing.T) {
+ now := time.Now
+ time.Now = func() time.Time {
+ return time.Date(2000, 1, 1, 10, 30, 0, 0, time.UTC)
+ }
+ since := time.Since
+ time.Since = func(t time.Time) time.Duration {
+ return time.Minute
+ }
+ t.Cleanup(func() {
+ time.Now = now
+ time.Since = since
+ })
+ t.Setenv("TZ", "")
+ nodeTypes.SetName("testnode")
+
+ log := hivetest.Logger(t)
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ t.Cleanup(cancel)
+ scripttest.Test(t,
+ ctx,
+ func(t testing.TB, args []string) *script.Engine {
+ h := hive.New(
+ k8sClient.FakeClientCell(),
+ TableCell,
+
+ cell.Invoke(
+ func(statedb.Table[LocalPodIPPool]) {},
+ ),
+ )
+
+ flags := pflag.NewFlagSet("", pflag.ContinueOnError)
+ h.RegisterFlags(flags)
+
+ t.Cleanup(func() {
+ assert.NoError(t, h.Stop(log, context.TODO()))
+ })
+ cmds, err := h.ScriptCommands(log)
+ require.NoError(t, err, "ScriptCommands")
+ maps.Insert(cmds, maps.All(script.DefaultCmds()))
+ return &script.Engine{
+ Cmds: cmds,
+ }
+ }, []string{}, "testdata/*.txtar")
+}
--- /dev/null
+#
+# Validate reflection and indexing of Table[LocalPodIPPool]
+#
+
+hive start
+
+# Start state: empty table
+db/empty podippools
+
+# Add pool object
+k8s/add pool.yaml
+
+# Validate table contents and reflector health
+db/cmp podippools pools.table
+db/cmp --grep=reflector-podippools health health.table
+
+# Verify JSON & YAML exports
+db/show --format=json --out=actual.json podippools
+cmp expected.json actual.json
+
+db/show --format=yaml --out=actual.yaml podippools
+cmp expected.yaml actual.yaml
+
+# Verify name index
+db/get --index=name --columns=Name --out=actual.table podippools pool1
+cmp pools_name.table actual.table
+
+-- health.table --
+Module Component Level Message
+ipam-podippool-table job-k8s-reflector-podippools-daemon-k8s OK 1 upserted, 0 deleted, 1 total objects
+-- empty.table --
+Name v4CIDRs v4MaskSize v6CIDRs v6MaskSize Flags
+-- pools.table --
+Name v4CIDRs v4MaskSize v6CIDRs v6MaskSize Flags
+pool1 10.0.0.0/24 28 fd00::/112 120 SkipMasquerade=true
+-- pools_name.table --
+Name
+pool1
+-- expected.json --
+{
+ "kind": "CiliumPodIPPool",
+ "apiVersion": "cilium.io/v2alpha1",
+ "metadata": {
+ "name": "pool1",
+ "resourceVersion": "1",
+ "annotations": {
+ "ipam.cilium.io/skip-masquerade": "true"
+ }
+ },
+ "spec": {
+ "ipv4": {
+ "cidrs": [
+ "10.0.0.0/24"
+ ],
+ "maskSize": 28
+ },
+ "ipv6": {
+ "cidrs": [
+ "fd00::/112"
+ ],
+ "maskSize": 120
+ }
+ },
+ "updatedAt": "2000-01-01T10:30:00Z"
+}
+-- expected.yaml --
+ciliumpodippool:
+ typemeta:
+ kind: CiliumPodIPPool
+ apiversion: cilium.io/v2alpha1
+ objectmeta:
+ name: pool1
+ generatename: ""
+ namespace: ""
+ selflink: ""
+ uid: ""
+ resourceversion: "1"
+ generation: 0
+ creationtimestamp: "0001-01-01T00:00:00Z"
+ deletiontimestamp: null
+ deletiongraceperiodseconds: null
+ labels: {}
+ annotations:
+ ipam.cilium.io/skip-masquerade: "true"
+ ownerreferences: []
+ finalizers: []
+ managedfields: []
+ spec:
+ ipv4:
+ cidrs:
+ - 10.0.0.0/24
+ masksize: 28
+ ipv6:
+ cidrs:
+ - fd00::/112
+ masksize: 120
+ podselector: null
+ namespaceselector: null
+updatedAt: 2000-01-01T10:30:00Z
+-- pool.yaml --
+apiVersion: cilium.io/v2alpha1
+kind: CiliumPodIPPool
+metadata:
+ name: pool1
+ annotations:
+ ipam.cilium.io/skip-masquerade: "true"
+spec:
+ ipv4:
+ cidrs:
+ - 10.0.0.0/24
+ maskSize: 28
+ ipv6:
+ cidrs:
+ - fd00::/112
+ maskSize: 120
"github.com/davecgh/go-spew/spew"
+ "github.com/cilium/statedb"
+
agentK8s "github.com/cilium/cilium/daemon/k8s"
"github.com/cilium/cilium/pkg/datapath/linux/sysctl"
"github.com/cilium/cilium/pkg/datapath/types"
"github.com/cilium/cilium/pkg/endpoint"
+ "github.com/cilium/cilium/pkg/ipam/podippool"
"github.com/cilium/cilium/pkg/ipmasq"
"github.com/cilium/cilium/pkg/k8s/client"
"github.com/cilium/cilium/pkg/lock"
// InterfaceNumber is a field for generically identifying an interface.
// This is only useful in ENI mode.
InterfaceNumber string
+
+ // SkipMasquerade indicates whether the datapath should avoid masquerading connections from this IP when the cluster is in tunneling mode.
+ SkipMasquerade bool
}
// Allocator is the interface for an IP allocator implementation
nodeDiscovery Owner
sysctl sysctl.Sysctl
ipMasqAgent *ipmasq.IPMasqAgent
+
+ db *statedb.DB
+ podIPPools statedb.Table[podippool.LocalPodIPPool]
+
+ onlyMasqueradeDefaultPool bool
}
func (ipam *IPAM) EndpointCreated(ep *endpoint.Endpoint) {}
// EndpointFlagAtHostNS indicates that this endpoint is located at the host networking
// namespace
EndpointFlagAtHostNS = 2
+
+ // EndpointFlagSkipMasqueradeV4 indicates that this endpoint should skip IPv4 masquerade for remote traffic
+ EndpointFlagSkipMasqueradeV4 = 4
+
+ // EndpointFlagSkipMasqueradeV6 indicates that this endpoint should skip IPv6 masquerade for remote traffic
+ EndpointFlagSkipMasqueradeV6 = 8
)
// EndpointFrontend is the interface to implement for an object to synchronize
IPv6Address() netip.Addr
GetIdentity() identity.NumericIdentity
IsAtHostNS() bool
+ // SkipMasqueradeV4 indicates whether this endpoint should skip IPv4 masquerade for remote traffic
+ SkipMasqueradeV4() bool
+ // SkipMasqueradeV6 indicates whether this endpoint should skip IPv6 masquerade for remote traffic
+ SkipMasqueradeV6() bool
}
// getBPFKeys returns all keys which should represent this endpoint in the BPF
if e.IsAtHostNS() {
info.Flags |= EndpointFlagAtHostNS
}
+ if e.SkipMasqueradeV4() {
+ info.Flags |= EndpointFlagSkipMasqueradeV4
+ }
+ if e.SkipMasqueradeV6() {
+ info.Flags |= EndpointFlagSkipMasqueradeV6
+ }
return info, nil
}
datapathOption "github.com/cilium/cilium/pkg/datapath/option"
"github.com/cilium/cilium/pkg/datapath/tables"
"github.com/cilium/cilium/pkg/defaults"
+ "github.com/cilium/cilium/pkg/endpoint"
endpointid "github.com/cilium/cilium/pkg/endpoint/id"
ipamOption "github.com/cilium/cilium/pkg/ipam/option"
"github.com/cilium/cilium/pkg/logging"
ep.Addressing.IPV6 = ipam.Address.IPV6
ep.Addressing.IPV6PoolName = ipam.Address.IPV6PoolName
ep.Addressing.IPV6ExpirationUUID = ipam.IPV6.ExpirationUUID
+ if ipam.IPV6.SkipMasquerade {
+ ep.Properties[endpoint.PropertySkipMasqueradeV6] = true
+ }
ipv6Config, routes, err = prepareIP(ep.Addressing.IPV6, state, int(conf.RouteMTU))
if err != nil {
ep.Addressing.IPV4 = ipam.Address.IPV4
ep.Addressing.IPV4PoolName = ipam.Address.IPV4PoolName
ep.Addressing.IPV4ExpirationUUID = ipam.IPV4.ExpirationUUID
+ if ipam.IPV4.SkipMasquerade {
+ ep.Properties[endpoint.PropertySkipMasqueradeV4] = true
+ }
ipConfig, routes, err = prepareIP(ep.Addressing.IPV4, state, int(conf.RouteMTU))
if err != nil {
K8sUID: string(c.CniArgs.K8S_POD_UID),
ContainerInterfaceName: c.Args.IfName,
DatapathConfiguration: &models.EndpointDatapathConfiguration{},
+ Properties: make(map[string]any),
}
if c.Conf.IpamMode == ipamOption.IPAMDelegatedPlugin {