]> git.feebdaed.xyz Git - 0xmirror/cilium.git/commitdiff
hubble/parser: Make L34 packet parser customizable
authorFabian Fischer <fabian.fischer@isovalent.com>
Wed, 10 Dec 2025 10:55:56 +0000 (11:55 +0100)
committerAndré Martins <aanm@users.noreply.github.com>
Wed, 17 Dec 2025 08:36:12 +0000 (08:36 +0000)
Add the possibility to replace the L34 packet parsing logic through
parser options.

Signed-off-by: Fabian Fischer <fabian.fischer@isovalent.com>
pkg/hubble/parser/options/options.go
pkg/hubble/parser/threefour/parser.go
pkg/hubble/parser/threefour/parser_test.go

index 80d06d78bf83bf0f94b99e914a89140c55f7714a..887896c04ee7812fc6b6d81462a147ed2aefb585 100644 (file)
@@ -4,6 +4,7 @@
 package options
 
 import (
+       "net/netip"
        "strings"
 
        pb "github.com/cilium/cilium/api/v1/flow"
@@ -26,6 +27,7 @@ type Options struct {
        TraceNotifyDecoder         TraceNotifyDecoderFunc
        PolicyVerdictNotifyDecoder PolicyVerdictNotifyDecoderFunc
        TraceSockNotifyDecoder     TraceSockNotifyDecoderFunc
+       L34PacketDecoder           L34PacketDecoder
 }
 
 // HubbleRedactSettings contains all hubble redact related options
@@ -126,6 +128,20 @@ func WithTraceSockNotifyDecoder(decode TraceSockNotifyDecoderFunc) Option {
        }
 }
 
+type L34PacketDecoder interface {
+       DecodePacket(payload []byte, decoded *pb.Flow, isL3Device, isIPv6, isVXLAN, isGeneve bool) (
+               sourceIP, destinationIP netip.Addr,
+               sourcePort, destinationPort uint16,
+               err error,
+       )
+}
+
+func WithL34PacketDecoder(decoder L34PacketDecoder) Option {
+       return func(opt *Options) {
+               opt.L34PacketDecoder = decoder
+       }
+}
+
 func headerSliceToMap(headerList []string) map[string]struct{} {
        headerMap := make(map[string]struct{}, len(headerList))
        for _, header := range headerList {
index a9d75c0cff1b8e50763763b878f822c716a3a26b..c9cb8176498ea29f21124a7e0400ece9aaae54ff 100644 (file)
@@ -41,16 +41,14 @@ type Parser struct {
        traceNotifyDecoder         options.TraceNotifyDecoderFunc
        policyVerdictNotifyDecoder options.PolicyVerdictNotifyDecoderFunc
        debugCaptureDecoder        options.DebugCaptureDecoderFunc
+       packetDecoder              options.L34PacketDecoder
 
        epResolver          *common.EndpointResolver
        correlateL3L4Policy bool
-
-       // TODO: consider using a pool of these
-       packet *packet
 }
 
-// re-usable packet to avoid reallocating gopacket datastructures
-type packet struct {
+// re-usable packetDecoder to avoid reallocating gopacket datastructures
+type packetDecoder struct {
        lock.Mutex
 
        decLayerL2Dev *gopacket.DecodingLayerParser
@@ -103,7 +101,7 @@ func New(
        linkGetter getters.LinkGetter,
        opts ...options.Option,
 ) (*Parser, error) {
-       packet := &packet{}
+       packet := &packetDecoder{}
        decoders := []gopacket.DecodingLayer{
                &packet.Ethernet,
                &packet.IPv4, &packet.IPv6,
@@ -152,6 +150,7 @@ func New(
                        pvn := &monitor.PolicyVerdictNotify{}
                        return pvn, pvn.Decode(data)
                },
+               L34PacketDecoder: packet,
        }
 
        for _, opt := range opts {
@@ -171,7 +170,7 @@ func New(
                traceNotifyDecoder:         args.TraceNotifyDecoder,
                policyVerdictNotifyDecoder: args.PolicyVerdictNotifyDecoder,
                epResolver:                 common.NewEndpointResolver(log, endpointGetter, identityGetter, ipGetter),
-               packet:                     packet,
+               packetDecoder:              args.L34PacketDecoder,
                correlateL3L4Policy:        args.EnableNetworkPolicyCorrelation,
        }, nil
 }
@@ -244,11 +243,12 @@ func (p *Parser) Decode(data []byte, decoded *pb.Flow) error {
        isIPv6 := tn != nil && tn.IsIPv6() || dn != nil && dn.IsIPv6() || pvn != nil && pvn.IsTrafficIPv6()
        isVXLAN := tn != nil && tn.IsVXLAN() || dn != nil && dn.IsVXLAN()
        isGeneve := tn != nil && tn.IsGeneve() || dn != nil && dn.IsGeneve()
-       ether, ip, l4, tunnel, srcIP, dstIP, srcPort, dstPort, summary, err := decodeLayers(data[packetOffset:], p.packet, isL3Device, isIPv6, isVXLAN, isGeneve)
+       srcIP, dstIP, srcPort, dstPort, err := p.packetDecoder.DecodePacket(data[packetOffset:], decoded, isL3Device, isIPv6, isVXLAN, isGeneve)
        if err != nil {
                return err
        }
 
+       ip := decoded.GetIP()
        if tn != nil && ip != nil {
                if !tn.OriginalIP().IsUnspecified() {
                        // Ignore invalid IP - getters will handle invalid value.
@@ -288,10 +288,6 @@ func (p *Parser) Decode(data []byte, decoded *pb.Flow) error {
        decoded.DropReason = decodeDropReason(dn, pvn)
        decoded.DropReasonDesc = pb.DropReason(decoded.DropReason)
        decoded.File = decodeFileInfo(dn)
-       decoded.Ethernet = ether
-       decoded.IP = ip
-       decoded.L4 = l4
-       decoded.Tunnel = tunnel
        decoded.Source = srcEndpoint
        decoded.Destination = dstEndpoint
        decoded.Type = pb.FlowType_L3_L4
@@ -310,7 +306,6 @@ func (p *Parser) Decode(data []byte, decoded *pb.Flow) error {
        decoded.DebugCapturePoint = decodeDebugCapturePoint(dbg)
        decoded.Interface = p.decodeNetworkInterface(tn, dbg)
        decoded.ProxyPort = decodeProxyPort(dbg, tn)
-       decoded.Summary = summary
 
        if p.correlateL3L4Policy && p.endpointGetter != nil {
                correlation.CorrelatePolicy(p.log, p.endpointGetter, decoded)
@@ -327,81 +322,76 @@ func (p *Parser) resolveNames(epID uint32, ip netip.Addr) (names []string) {
        return nil
 }
 
-func decodeLayers(payload []byte, packet *packet, isL3Device, isIPv6, isVXLAN, isGeneve bool) (
-       ethernet *pb.Ethernet,
-       ip *pb.IP,
-       l4 *pb.Layer4,
-       tunnel *pb.Tunnel,
+func (d *packetDecoder) DecodePacket(payload []byte, decoded *pb.Flow, isL3Device, isIPv6, isVXLAN, isGeneve bool) (
        sourceIP, destinationIP netip.Addr,
        sourcePort, destinationPort uint16,
-       summary string,
        err error,
 ) {
-       packet.Lock()
-       defer packet.Unlock()
+       d.Lock()
+       defer d.Unlock()
 
        // Since v1.1.18, DecodeLayers returns a non-nil error for an empty packet, see
        // https://github.com/google/gopacket/issues/846
        // TODO: reconsider this check if the issue is fixed upstream
        if len(payload) == 0 {
                // Truncate layers to avoid accidental re-use.
-               packet.Layers = packet.Layers[:0]
-               packet.overlay.Layers = packet.overlay.Layers[:0]
+               d.Layers = d.Layers[:0]
+               d.overlay.Layers = d.overlay.Layers[:0]
                return
        }
 
        switch {
        case !isL3Device:
-               err = packet.decLayerL2Dev.DecodeLayers(payload, &packet.Layers)
+               err = d.decLayerL2Dev.DecodeLayers(payload, &d.Layers)
        case isIPv6:
-               err = packet.decLayerL3Dev.IPv6.DecodeLayers(payload, &packet.Layers)
+               err = d.decLayerL3Dev.IPv6.DecodeLayers(payload, &d.Layers)
        default:
-               err = packet.decLayerL3Dev.IPv4.DecodeLayers(payload, &packet.Layers)
+               err = d.decLayerL3Dev.IPv4.DecodeLayers(payload, &d.Layers)
        }
 
        if err != nil {
                return
        }
 
-       for _, typ := range packet.Layers {
-               summary = typ.String()
+       for _, typ := range d.Layers {
+               decoded.Summary = typ.String()
                switch typ {
                case layers.LayerTypeEthernet:
-                       ethernet = decodeEthernet(&packet.Ethernet)
+                       decoded.Ethernet = decodeEthernet(&d.Ethernet)
                case layers.LayerTypeIPv4:
-                       ip, sourceIP, destinationIP = decodeIPv4(&packet.IPv4)
+                       decoded.IP, sourceIP, destinationIP = decodeIPv4(&d.IPv4)
                case layers.LayerTypeIPv6:
-                       ip, sourceIP, destinationIP = decodeIPv6(&packet.IPv6)
+                       decoded.IP, sourceIP, destinationIP = decodeIPv6(&d.IPv6)
                case layers.LayerTypeTCP:
-                       l4, sourcePort, destinationPort = decodeTCP(&packet.TCP)
-                       summary = "TCP Flags: " + getTCPFlags(packet.TCP)
+                       decoded.L4, sourcePort, destinationPort = decodeTCP(&d.TCP)
+                       decoded.Summary = "TCP Flags: " + getTCPFlags(d.TCP)
                case layers.LayerTypeUDP:
-                       l4, sourcePort, destinationPort = decodeUDP(&packet.UDP)
+                       decoded.L4, sourcePort, destinationPort = decodeUDP(&d.UDP)
                case layers.LayerTypeSCTP:
-                       l4, sourcePort, destinationPort = decodeSCTP(&packet.SCTP)
+                       decoded.L4, sourcePort, destinationPort = decodeSCTP(&d.SCTP)
                case layers.LayerTypeICMPv4:
-                       l4 = decodeICMPv4(&packet.ICMPv4)
-                       summary = "ICMPv4 " + packet.ICMPv4.TypeCode.String()
+                       decoded.L4 = decodeICMPv4(&d.ICMPv4)
+                       decoded.Summary = "ICMPv4 " + d.ICMPv4.TypeCode.String()
                case layers.LayerTypeICMPv6:
-                       l4 = decodeICMPv6(&packet.ICMPv6)
-                       summary = "ICMPv6 " + packet.ICMPv6.TypeCode.String()
+                       decoded.L4 = decodeICMPv6(&d.ICMPv6)
+                       decoded.Summary = "ICMPv6 " + d.ICMPv6.TypeCode.String()
                case layers.LayerTypeVRRP:
-                       l4 = decodeVRRP(&packet.VRRPv2)
-                       summary = "VRRP " + packet.VRRPv2.Type.String()
+                       decoded.L4 = decodeVRRP(&d.VRRPv2)
+                       decoded.Summary = "VRRP " + d.VRRPv2.Type.String()
                case layers.LayerTypeIGMP:
-                       l4 = decodeIGMP(&packet.IGMPv1or2)
-                       summary = "IGMP " + packet.IGMPv1or2.Type.String()
+                       decoded.L4 = decodeIGMP(&d.IGMPv1or2)
+                       decoded.Summary = "IGMP " + d.IGMPv1or2.Type.String()
                }
        }
 
        switch {
        case isVXLAN:
-               err = packet.decLayerOverlay.VXLAN.DecodeLayers(packet.UDP.Payload, &packet.overlay.Layers)
+               err = d.decLayerOverlay.VXLAN.DecodeLayers(d.UDP.Payload, &d.overlay.Layers)
        case isGeneve:
-               err = packet.decLayerOverlay.Geneve.DecodeLayers(packet.UDP.Payload, &packet.overlay.Layers)
+               err = d.decLayerOverlay.Geneve.DecodeLayers(d.UDP.Payload, &d.overlay.Layers)
        default:
                // Truncate layers to avoid accidental re-use.
-               packet.overlay.Layers = packet.overlay.Layers[:0]
+               d.overlay.Layers = d.overlay.Layers[:0]
                return
        }
 
@@ -411,60 +401,60 @@ func decodeLayers(payload []byte, packet *packet, isL3Device, isIPv6, isVXLAN, i
        }
 
        // Return in case we have not decoded any overlay layer.
-       if len(packet.overlay.Layers) == 0 {
+       if len(d.overlay.Layers) == 0 {
                return
        }
 
        // Expect VXLAN/Geneve overlay as first overlay layer, if not we bail out.
-       switch packet.overlay.Layers[0] {
+       switch d.overlay.Layers[0] {
        case layers.LayerTypeVXLAN:
-               tunnel = &pb.Tunnel{Protocol: pb.Tunnel_VXLAN, IP: ip, L4: l4}
+               decoded.Tunnel = &pb.Tunnel{Protocol: pb.Tunnel_VXLAN, IP: decoded.IP, L4: decoded.L4}
        case layers.LayerTypeGeneve:
-               tunnel = &pb.Tunnel{Protocol: pb.Tunnel_GENEVE, IP: ip, L4: l4}
+               decoded.Tunnel = &pb.Tunnel{Protocol: pb.Tunnel_GENEVE, IP: decoded.IP, L4: decoded.L4}
        default:
                return
        }
 
        // Reset return values. This ensures the resulting flow does not misrepresent
        // what is happening (e.g. same IP addresses for overlay and underlay).
-       ethernet, ip, l4 = nil, nil, nil
+       decoded.Ethernet, decoded.IP, decoded.L4 = nil, nil, nil
        sourceIP, destinationIP = netip.Addr{}, netip.Addr{}
        sourcePort, destinationPort = 0, 0
-       summary = ""
+       decoded.Summary = ""
 
        // Parse the rest of the overlay layers as we would do for a non-encapsulated packet.
        // It is possible we're not parsing any layer here. This is because the overlay
        // decoders failed (e.g., not enough data). We would still return empty values
        // for the inner packet (ethernet, ip, l4, basically the re-init variables)
        // while returning the non-empty `tunnel` field.
-       for _, typ := range packet.overlay.Layers[1:] {
-               summary = typ.String()
+       for _, typ := range d.overlay.Layers[1:] {
+               decoded.Summary = typ.String()
                switch typ {
                case layers.LayerTypeEthernet:
-                       ethernet = decodeEthernet(&packet.overlay.Ethernet)
+                       decoded.Ethernet = decodeEthernet(&d.overlay.Ethernet)
                case layers.LayerTypeIPv4:
-                       ip, sourceIP, destinationIP = decodeIPv4(&packet.overlay.IPv4)
+                       decoded.IP, sourceIP, destinationIP = decodeIPv4(&d.overlay.IPv4)
                case layers.LayerTypeIPv6:
-                       ip, sourceIP, destinationIP = decodeIPv6(&packet.overlay.IPv6)
+                       decoded.IP, sourceIP, destinationIP = decodeIPv6(&d.overlay.IPv6)
                case layers.LayerTypeTCP:
-                       l4, sourcePort, destinationPort = decodeTCP(&packet.overlay.TCP)
-                       summary = "TCP Flags: " + getTCPFlags(packet.overlay.TCP)
+                       decoded.L4, sourcePort, destinationPort = decodeTCP(&d.overlay.TCP)
+                       decoded.Summary = "TCP Flags: " + getTCPFlags(d.overlay.TCP)
                case layers.LayerTypeUDP:
-                       l4, sourcePort, destinationPort = decodeUDP(&packet.overlay.UDP)
+                       decoded.L4, sourcePort, destinationPort = decodeUDP(&d.overlay.UDP)
                case layers.LayerTypeSCTP:
-                       l4, sourcePort, destinationPort = decodeSCTP(&packet.overlay.SCTP)
+                       decoded.L4, sourcePort, destinationPort = decodeSCTP(&d.overlay.SCTP)
                case layers.LayerTypeICMPv4:
-                       l4 = decodeICMPv4(&packet.overlay.ICMPv4)
-                       summary = "ICMPv4 " + packet.overlay.ICMPv4.TypeCode.String()
+                       decoded.L4 = decodeICMPv4(&d.overlay.ICMPv4)
+                       decoded.Summary = "ICMPv4 " + d.overlay.ICMPv4.TypeCode.String()
                case layers.LayerTypeICMPv6:
-                       l4 = decodeICMPv6(&packet.overlay.ICMPv6)
-                       summary = "ICMPv6 " + packet.overlay.ICMPv6.TypeCode.String()
+                       decoded.L4 = decodeICMPv6(&d.overlay.ICMPv6)
+                       decoded.Summary = "ICMPv6 " + d.overlay.ICMPv6.TypeCode.String()
                case layers.LayerTypeVRRP:
-                       l4 = decodeVRRP(&packet.overlay.VRRPv2)
-                       summary = "VRRP " + packet.overlay.VRRPv2.Type.String()
+                       decoded.L4 = decodeVRRP(&d.overlay.VRRPv2)
+                       decoded.Summary = "VRRP " + d.overlay.VRRPv2.Type.String()
                case layers.LayerTypeIGMP:
-                       l4 = decodeIGMP(&packet.overlay.IGMPv1or2)
-                       summary = "IGMP " + packet.overlay.IGMPv1or2.Type.String()
+                       decoded.L4 = decodeIGMP(&d.overlay.IGMPv1or2)
+                       decoded.Summary = "IGMP " + d.overlay.IGMPv1or2.Type.String()
                }
        }
 
index 58910957fbc2c3d0a920d7754a39bcb7c3f4efa3..5680139b5b86273bcb709d6ed6da3043b2b5a163 100644 (file)
@@ -2652,3 +2652,76 @@ func TestDecode_CustomMonitorDecoder(t *testing.T) {
                })
        }
 }
+
+type customDecoder struct {
+}
+
+// DecodePacket implements options.L34PacketDecoder.
+func (c customDecoder) DecodePacket(payload []byte, decoded *flowpb.Flow, isL3Device bool, isIPv6 bool, isVXLAN bool, isGeneve bool) (sourceIP netip.Addr, destinationIP netip.Addr, sourcePort uint16, destinationPort uint16, err error) {
+       sourceIP = netip.MustParseAddr("10.13.37.1")
+       destinationIP = netip.MustParseAddr("10.13.37.2")
+       sourcePort = 10011
+       destinationPort = 443
+
+       decoded.IP = &flowpb.IP{
+               Source:      sourceIP.String(),
+               Destination: destinationIP.String(),
+               IpVersion:   flowpb.IPVersion_IPv4,
+       }
+
+       return
+}
+
+func TestDecode_CustomPacketDecoder(t *testing.T) {
+       event := monitor.TraceNotify{
+               Type:     byte(monitorAPI.MessageTypeTrace),
+               Source:   localEP,
+               ObsPoint: monitorAPI.TraceFromLxc,
+               Reason:   monitor.TraceReasonCtEstablished,
+               Version:  monitor.TraceNotifyVersion2,
+       }
+
+       var l []gopacket.SerializableLayer
+       l = append(l, &layers.Ethernet{SrcMAC: srcMAC, DstMAC: dstMAC, EthernetType: layers.EthernetTypeIPv4}, &layers.IPv4{SrcIP: egressTuple.src.AsSlice(), DstIP: egressTuple.dst.AsSlice()})
+
+       want := &flowpb.Flow{
+               EventType: &flowpb.CiliumEventType{
+                       Type:    4,
+                       SubType: 5,
+               },
+               Type:        flowpb.FlowType_L3_L4,
+               Source:      &flowpb.Endpoint{},
+               Destination: &flowpb.Endpoint{},
+               IP: &flowpb.IP{
+                       IpVersion:   flowpb.IPVersion_IPv4,
+                       Source:      "10.13.37.1",
+                       Destination: "10.13.37.2",
+               },
+               TraceObservationPoint: flowpb.TraceObservationPoint_FROM_ENDPOINT,
+               TraceReason:           flowpb.TraceReason_ESTABLISHED,
+               TrafficDirection:      flowpb.TrafficDirection_INGRESS,
+               IsReply:               wrapperspb.Bool(false),
+               Verdict:               flowpb.Verdict_FORWARDED,
+       }
+
+       data, err := testutils.CreateL3L4Payload(event, l...)
+       if err != nil {
+               t.Fatalf("Unexpected error from CreateL3L4Payload(%T, ...): %v", event, err)
+       }
+
+       parser, err := New(hivetest.Logger(t), defaultEndpointGetter, nil, nil, nil, nil, nil, options.WithL34PacketDecoder(customDecoder{}))
+       require.NoError(t, err)
+
+       got := &flowpb.Flow{}
+       if err := parser.Decode(data, got); err != nil {
+               t.Fatalf("Unexpected error from Decode(data, %T): %v", got, err)
+       }
+
+       opts := []cmp.Option{
+               protocmp.Transform(),
+               protocmp.IgnoreFields(&flowpb.Flow{}, "reply"),
+       }
+       if diff := cmp.Diff(want, got, opts...); diff != "" {
+               t.Errorf("Unexpected diff (-want +got):\n%s", diff)
+       }
+}