traceNotifyDecoder options.TraceNotifyDecoderFunc
policyVerdictNotifyDecoder options.PolicyVerdictNotifyDecoderFunc
debugCaptureDecoder options.DebugCaptureDecoderFunc
+ packetDecoder options.L34PacketDecoder
epResolver *common.EndpointResolver
correlateL3L4Policy bool
-
- // TODO: consider using a pool of these
- packet *packet
}
-// re-usable packet to avoid reallocating gopacket datastructures
-type packet struct {
+// re-usable packetDecoder to avoid reallocating gopacket datastructures
+type packetDecoder struct {
lock.Mutex
decLayerL2Dev *gopacket.DecodingLayerParser
linkGetter getters.LinkGetter,
opts ...options.Option,
) (*Parser, error) {
- packet := &packet{}
+ packet := &packetDecoder{}
decoders := []gopacket.DecodingLayer{
&packet.Ethernet,
&packet.IPv4, &packet.IPv6,
pvn := &monitor.PolicyVerdictNotify{}
return pvn, pvn.Decode(data)
},
+ L34PacketDecoder: packet,
}
for _, opt := range opts {
traceNotifyDecoder: args.TraceNotifyDecoder,
policyVerdictNotifyDecoder: args.PolicyVerdictNotifyDecoder,
epResolver: common.NewEndpointResolver(log, endpointGetter, identityGetter, ipGetter),
- packet: packet,
+ packetDecoder: args.L34PacketDecoder,
correlateL3L4Policy: args.EnableNetworkPolicyCorrelation,
}, nil
}
isIPv6 := tn != nil && tn.IsIPv6() || dn != nil && dn.IsIPv6() || pvn != nil && pvn.IsTrafficIPv6()
isVXLAN := tn != nil && tn.IsVXLAN() || dn != nil && dn.IsVXLAN()
isGeneve := tn != nil && tn.IsGeneve() || dn != nil && dn.IsGeneve()
- ether, ip, l4, tunnel, srcIP, dstIP, srcPort, dstPort, summary, err := decodeLayers(data[packetOffset:], p.packet, isL3Device, isIPv6, isVXLAN, isGeneve)
+ srcIP, dstIP, srcPort, dstPort, err := p.packetDecoder.DecodePacket(data[packetOffset:], decoded, isL3Device, isIPv6, isVXLAN, isGeneve)
if err != nil {
return err
}
+ ip := decoded.GetIP()
if tn != nil && ip != nil {
if !tn.OriginalIP().IsUnspecified() {
// Ignore invalid IP - getters will handle invalid value.
decoded.DropReason = decodeDropReason(dn, pvn)
decoded.DropReasonDesc = pb.DropReason(decoded.DropReason)
decoded.File = decodeFileInfo(dn)
- decoded.Ethernet = ether
- decoded.IP = ip
- decoded.L4 = l4
- decoded.Tunnel = tunnel
decoded.Source = srcEndpoint
decoded.Destination = dstEndpoint
decoded.Type = pb.FlowType_L3_L4
decoded.DebugCapturePoint = decodeDebugCapturePoint(dbg)
decoded.Interface = p.decodeNetworkInterface(tn, dbg)
decoded.ProxyPort = decodeProxyPort(dbg, tn)
- decoded.Summary = summary
if p.correlateL3L4Policy && p.endpointGetter != nil {
correlation.CorrelatePolicy(p.log, p.endpointGetter, decoded)
return nil
}
-func decodeLayers(payload []byte, packet *packet, isL3Device, isIPv6, isVXLAN, isGeneve bool) (
- ethernet *pb.Ethernet,
- ip *pb.IP,
- l4 *pb.Layer4,
- tunnel *pb.Tunnel,
+func (d *packetDecoder) DecodePacket(payload []byte, decoded *pb.Flow, isL3Device, isIPv6, isVXLAN, isGeneve bool) (
sourceIP, destinationIP netip.Addr,
sourcePort, destinationPort uint16,
- summary string,
err error,
) {
- packet.Lock()
- defer packet.Unlock()
+ d.Lock()
+ defer d.Unlock()
// Since v1.1.18, DecodeLayers returns a non-nil error for an empty packet, see
// https://github.com/google/gopacket/issues/846
// TODO: reconsider this check if the issue is fixed upstream
if len(payload) == 0 {
// Truncate layers to avoid accidental re-use.
- packet.Layers = packet.Layers[:0]
- packet.overlay.Layers = packet.overlay.Layers[:0]
+ d.Layers = d.Layers[:0]
+ d.overlay.Layers = d.overlay.Layers[:0]
return
}
switch {
case !isL3Device:
- err = packet.decLayerL2Dev.DecodeLayers(payload, &packet.Layers)
+ err = d.decLayerL2Dev.DecodeLayers(payload, &d.Layers)
case isIPv6:
- err = packet.decLayerL3Dev.IPv6.DecodeLayers(payload, &packet.Layers)
+ err = d.decLayerL3Dev.IPv6.DecodeLayers(payload, &d.Layers)
default:
- err = packet.decLayerL3Dev.IPv4.DecodeLayers(payload, &packet.Layers)
+ err = d.decLayerL3Dev.IPv4.DecodeLayers(payload, &d.Layers)
}
if err != nil {
return
}
- for _, typ := range packet.Layers {
- summary = typ.String()
+ for _, typ := range d.Layers {
+ decoded.Summary = typ.String()
switch typ {
case layers.LayerTypeEthernet:
- ethernet = decodeEthernet(&packet.Ethernet)
+ decoded.Ethernet = decodeEthernet(&d.Ethernet)
case layers.LayerTypeIPv4:
- ip, sourceIP, destinationIP = decodeIPv4(&packet.IPv4)
+ decoded.IP, sourceIP, destinationIP = decodeIPv4(&d.IPv4)
case layers.LayerTypeIPv6:
- ip, sourceIP, destinationIP = decodeIPv6(&packet.IPv6)
+ decoded.IP, sourceIP, destinationIP = decodeIPv6(&d.IPv6)
case layers.LayerTypeTCP:
- l4, sourcePort, destinationPort = decodeTCP(&packet.TCP)
- summary = "TCP Flags: " + getTCPFlags(packet.TCP)
+ decoded.L4, sourcePort, destinationPort = decodeTCP(&d.TCP)
+ decoded.Summary = "TCP Flags: " + getTCPFlags(d.TCP)
case layers.LayerTypeUDP:
- l4, sourcePort, destinationPort = decodeUDP(&packet.UDP)
+ decoded.L4, sourcePort, destinationPort = decodeUDP(&d.UDP)
case layers.LayerTypeSCTP:
- l4, sourcePort, destinationPort = decodeSCTP(&packet.SCTP)
+ decoded.L4, sourcePort, destinationPort = decodeSCTP(&d.SCTP)
case layers.LayerTypeICMPv4:
- l4 = decodeICMPv4(&packet.ICMPv4)
- summary = "ICMPv4 " + packet.ICMPv4.TypeCode.String()
+ decoded.L4 = decodeICMPv4(&d.ICMPv4)
+ decoded.Summary = "ICMPv4 " + d.ICMPv4.TypeCode.String()
case layers.LayerTypeICMPv6:
- l4 = decodeICMPv6(&packet.ICMPv6)
- summary = "ICMPv6 " + packet.ICMPv6.TypeCode.String()
+ decoded.L4 = decodeICMPv6(&d.ICMPv6)
+ decoded.Summary = "ICMPv6 " + d.ICMPv6.TypeCode.String()
case layers.LayerTypeVRRP:
- l4 = decodeVRRP(&packet.VRRPv2)
- summary = "VRRP " + packet.VRRPv2.Type.String()
+ decoded.L4 = decodeVRRP(&d.VRRPv2)
+ decoded.Summary = "VRRP " + d.VRRPv2.Type.String()
case layers.LayerTypeIGMP:
- l4 = decodeIGMP(&packet.IGMPv1or2)
- summary = "IGMP " + packet.IGMPv1or2.Type.String()
+ decoded.L4 = decodeIGMP(&d.IGMPv1or2)
+ decoded.Summary = "IGMP " + d.IGMPv1or2.Type.String()
}
}
switch {
case isVXLAN:
- err = packet.decLayerOverlay.VXLAN.DecodeLayers(packet.UDP.Payload, &packet.overlay.Layers)
+ err = d.decLayerOverlay.VXLAN.DecodeLayers(d.UDP.Payload, &d.overlay.Layers)
case isGeneve:
- err = packet.decLayerOverlay.Geneve.DecodeLayers(packet.UDP.Payload, &packet.overlay.Layers)
+ err = d.decLayerOverlay.Geneve.DecodeLayers(d.UDP.Payload, &d.overlay.Layers)
default:
// Truncate layers to avoid accidental re-use.
- packet.overlay.Layers = packet.overlay.Layers[:0]
+ d.overlay.Layers = d.overlay.Layers[:0]
return
}
}
// Return in case we have not decoded any overlay layer.
- if len(packet.overlay.Layers) == 0 {
+ if len(d.overlay.Layers) == 0 {
return
}
// Expect VXLAN/Geneve overlay as first overlay layer, if not we bail out.
- switch packet.overlay.Layers[0] {
+ switch d.overlay.Layers[0] {
case layers.LayerTypeVXLAN:
- tunnel = &pb.Tunnel{Protocol: pb.Tunnel_VXLAN, IP: ip, L4: l4}
+ decoded.Tunnel = &pb.Tunnel{Protocol: pb.Tunnel_VXLAN, IP: decoded.IP, L4: decoded.L4}
case layers.LayerTypeGeneve:
- tunnel = &pb.Tunnel{Protocol: pb.Tunnel_GENEVE, IP: ip, L4: l4}
+ decoded.Tunnel = &pb.Tunnel{Protocol: pb.Tunnel_GENEVE, IP: decoded.IP, L4: decoded.L4}
default:
return
}
// Reset return values. This ensures the resulting flow does not misrepresent
// what is happening (e.g. same IP addresses for overlay and underlay).
- ethernet, ip, l4 = nil, nil, nil
+ decoded.Ethernet, decoded.IP, decoded.L4 = nil, nil, nil
sourceIP, destinationIP = netip.Addr{}, netip.Addr{}
sourcePort, destinationPort = 0, 0
- summary = ""
+ decoded.Summary = ""
// Parse the rest of the overlay layers as we would do for a non-encapsulated packet.
// It is possible we're not parsing any layer here. This is because the overlay
// decoders failed (e.g., not enough data). We would still return empty values
// for the inner packet (ethernet, ip, l4, basically the re-init variables)
// while returning the non-empty `tunnel` field.
- for _, typ := range packet.overlay.Layers[1:] {
- summary = typ.String()
+ for _, typ := range d.overlay.Layers[1:] {
+ decoded.Summary = typ.String()
switch typ {
case layers.LayerTypeEthernet:
- ethernet = decodeEthernet(&packet.overlay.Ethernet)
+ decoded.Ethernet = decodeEthernet(&d.overlay.Ethernet)
case layers.LayerTypeIPv4:
- ip, sourceIP, destinationIP = decodeIPv4(&packet.overlay.IPv4)
+ decoded.IP, sourceIP, destinationIP = decodeIPv4(&d.overlay.IPv4)
case layers.LayerTypeIPv6:
- ip, sourceIP, destinationIP = decodeIPv6(&packet.overlay.IPv6)
+ decoded.IP, sourceIP, destinationIP = decodeIPv6(&d.overlay.IPv6)
case layers.LayerTypeTCP:
- l4, sourcePort, destinationPort = decodeTCP(&packet.overlay.TCP)
- summary = "TCP Flags: " + getTCPFlags(packet.overlay.TCP)
+ decoded.L4, sourcePort, destinationPort = decodeTCP(&d.overlay.TCP)
+ decoded.Summary = "TCP Flags: " + getTCPFlags(d.overlay.TCP)
case layers.LayerTypeUDP:
- l4, sourcePort, destinationPort = decodeUDP(&packet.overlay.UDP)
+ decoded.L4, sourcePort, destinationPort = decodeUDP(&d.overlay.UDP)
case layers.LayerTypeSCTP:
- l4, sourcePort, destinationPort = decodeSCTP(&packet.overlay.SCTP)
+ decoded.L4, sourcePort, destinationPort = decodeSCTP(&d.overlay.SCTP)
case layers.LayerTypeICMPv4:
- l4 = decodeICMPv4(&packet.overlay.ICMPv4)
- summary = "ICMPv4 " + packet.overlay.ICMPv4.TypeCode.String()
+ decoded.L4 = decodeICMPv4(&d.overlay.ICMPv4)
+ decoded.Summary = "ICMPv4 " + d.overlay.ICMPv4.TypeCode.String()
case layers.LayerTypeICMPv6:
- l4 = decodeICMPv6(&packet.overlay.ICMPv6)
- summary = "ICMPv6 " + packet.overlay.ICMPv6.TypeCode.String()
+ decoded.L4 = decodeICMPv6(&d.overlay.ICMPv6)
+ decoded.Summary = "ICMPv6 " + d.overlay.ICMPv6.TypeCode.String()
case layers.LayerTypeVRRP:
- l4 = decodeVRRP(&packet.overlay.VRRPv2)
- summary = "VRRP " + packet.overlay.VRRPv2.Type.String()
+ decoded.L4 = decodeVRRP(&d.overlay.VRRPv2)
+ decoded.Summary = "VRRP " + d.overlay.VRRPv2.Type.String()
case layers.LayerTypeIGMP:
- l4 = decodeIGMP(&packet.overlay.IGMPv1or2)
- summary = "IGMP " + packet.overlay.IGMPv1or2.Type.String()
+ decoded.L4 = decodeIGMP(&d.overlay.IGMPv1or2)
+ decoded.Summary = "IGMP " + d.overlay.IGMPv1or2.Type.String()
}
}
})
}
}
+
+type customDecoder struct {
+}
+
+// DecodePacket implements options.L34PacketDecoder.
+func (c customDecoder) DecodePacket(payload []byte, decoded *flowpb.Flow, isL3Device bool, isIPv6 bool, isVXLAN bool, isGeneve bool) (sourceIP netip.Addr, destinationIP netip.Addr, sourcePort uint16, destinationPort uint16, err error) {
+ sourceIP = netip.MustParseAddr("10.13.37.1")
+ destinationIP = netip.MustParseAddr("10.13.37.2")
+ sourcePort = 10011
+ destinationPort = 443
+
+ decoded.IP = &flowpb.IP{
+ Source: sourceIP.String(),
+ Destination: destinationIP.String(),
+ IpVersion: flowpb.IPVersion_IPv4,
+ }
+
+ return
+}
+
+func TestDecode_CustomPacketDecoder(t *testing.T) {
+ event := monitor.TraceNotify{
+ Type: byte(monitorAPI.MessageTypeTrace),
+ Source: localEP,
+ ObsPoint: monitorAPI.TraceFromLxc,
+ Reason: monitor.TraceReasonCtEstablished,
+ Version: monitor.TraceNotifyVersion2,
+ }
+
+ var l []gopacket.SerializableLayer
+ l = append(l, &layers.Ethernet{SrcMAC: srcMAC, DstMAC: dstMAC, EthernetType: layers.EthernetTypeIPv4}, &layers.IPv4{SrcIP: egressTuple.src.AsSlice(), DstIP: egressTuple.dst.AsSlice()})
+
+ want := &flowpb.Flow{
+ EventType: &flowpb.CiliumEventType{
+ Type: 4,
+ SubType: 5,
+ },
+ Type: flowpb.FlowType_L3_L4,
+ Source: &flowpb.Endpoint{},
+ Destination: &flowpb.Endpoint{},
+ IP: &flowpb.IP{
+ IpVersion: flowpb.IPVersion_IPv4,
+ Source: "10.13.37.1",
+ Destination: "10.13.37.2",
+ },
+ TraceObservationPoint: flowpb.TraceObservationPoint_FROM_ENDPOINT,
+ TraceReason: flowpb.TraceReason_ESTABLISHED,
+ TrafficDirection: flowpb.TrafficDirection_INGRESS,
+ IsReply: wrapperspb.Bool(false),
+ Verdict: flowpb.Verdict_FORWARDED,
+ }
+
+ data, err := testutils.CreateL3L4Payload(event, l...)
+ if err != nil {
+ t.Fatalf("Unexpected error from CreateL3L4Payload(%T, ...): %v", event, err)
+ }
+
+ parser, err := New(hivetest.Logger(t), defaultEndpointGetter, nil, nil, nil, nil, nil, options.WithL34PacketDecoder(customDecoder{}))
+ require.NoError(t, err)
+
+ got := &flowpb.Flow{}
+ if err := parser.Decode(data, got); err != nil {
+ t.Fatalf("Unexpected error from Decode(data, %T): %v", got, err)
+ }
+
+ opts := []cmp.Option{
+ protocmp.Transform(),
+ protocmp.IgnoreFields(&flowpb.Flow{}, "reply"),
+ }
+ if diff := cmp.Diff(want, got, opts...); diff != "" {
+ t.Errorf("Unexpected diff (-want +got):\n%s", diff)
+ }
+}