]> git.feebdaed.xyz Git - 0xmirror/cilium.git/commitdiff
monitor: add parser support for new notify extensions
authorFabian Fischer <fabian.fischer@isovalent.com>
Mon, 8 Dec 2025 09:30:58 +0000 (10:30 +0100)
committerAndré Martins <aanm@users.noreply.github.com>
Wed, 17 Dec 2025 08:36:12 +0000 (08:36 +0000)
Make sure the upstream monitor event parsing logic still works, even if
a downstream project adds notify extensions.

The main change is, that downstream projects will have to register
the sizes of the extensions, so that the upstream project can still
properly determine the data offset, i.e. where the notify event stops and
the packet begins.

Signed-off-by: Fabian Fischer <fabian.fischer@isovalent.com>
pkg/hubble/parser/threefour/parser.go
pkg/monitor/datapath_debug.go
pkg/monitor/datapath_debug_test.go
pkg/monitor/datapath_drop.go
pkg/monitor/datapath_drop_test.go
pkg/monitor/datapath_policy.go
pkg/monitor/datapath_policy_test.go
pkg/monitor/datapath_trace.go
pkg/monitor/datapath_trace_test.go

index d4693c4b2ca99bf313634a8fc0caa8bc64582498..fee61daf58c12b23f558fd36004ac052f2d7039e 100644 (file)
@@ -197,7 +197,7 @@ func (p *Parser) Decode(data []byte, decoded *pb.Flow) error {
                        return fmt.Errorf("failed to parse policy verdict: %w", err)
                }
                eventSubType = pvn.SubType
-               packetOffset = monitor.PolicyVerdictNotifyLen
+               packetOffset = int(pvn.DataOffset())
                authType = pb.AuthType(pvn.GetAuthType())
        case monitorAPI.MessageTypeCapture:
                dbg = &monitor.DebugCapture{}
@@ -205,7 +205,7 @@ func (p *Parser) Decode(data []byte, decoded *pb.Flow) error {
                        return fmt.Errorf("failed to parse debug capture: %w", err)
                }
                eventSubType = dbg.SubType
-               packetOffset = monitor.DebugCaptureLen
+               packetOffset = int(dbg.DataOffset())
        default:
                return errors.NewErrInvalidType(eventType)
        }
index d4630b4c8e40f321b8bead265d84ea6bc3bf02fb..8fe4ebe5d8ec77f0f3bed13a560b82a6c98e78e2 100644 (file)
@@ -432,6 +432,19 @@ const (
        DebugCaptureLen = 24
 )
 
+const DebugCaptureExtensionDisabled = 0
+
+var (
+       // Downstream projects should register introduced extensions length so that
+       // the upstream parsing code still works even if the DP events contain
+       // additional fields.
+       debugCaptureExtensionLengthFromVersion = map[uint8]uint{
+               // The DebugCaptureExtension is intended for downstream extensions and
+               // should not be used in the upstream project.
+               DebugCaptureExtensionDisabled: 0,
+       }
+)
+
 // DebugCapture is the metadata sent along with a captured packet frame
 type DebugCapture struct {
        api.DefaultSrcDstGetter
@@ -487,12 +500,18 @@ func (n *DebugCapture) Decode(data []byte) error {
        return nil
 }
 
+// DataOffset returns the offset from the beginning of DebugCapture where the
+// notification data begins.
+func (n *DebugCapture) DataOffset() uint {
+       return DebugCaptureLen + debugCaptureExtensionLengthFromVersion[n.ExtVersion]
+}
+
 // DumpInfo prints a summary of the capture messages.
 func (n *DebugCapture) DumpInfo(buf *bufio.Writer, data []byte, linkMonitor getters.LinkGetter) {
        prefix := n.infoPrefix(linkMonitor)
 
        if len(prefix) > 0 {
-               fmt.Fprintf(buf, "%s: %s\n", prefix, GetConnectionSummary(data[DebugCaptureLen:], nil))
+               fmt.Fprintf(buf, "%s: %s\n", prefix, GetConnectionSummary(data[n.DataOffset():], nil))
        }
 }
 
@@ -523,8 +542,8 @@ func (n *DebugCapture) infoPrefix(linkMonitor getters.LinkGetter) string {
 func (n *DebugCapture) DumpVerbose(buf *bufio.Writer, dissect bool, data []byte, prefix string) {
        fmt.Fprintf(buf, "%s MARK %#x FROM %d DEBUG: %d bytes, %s", prefix, n.Hash, n.Source, n.Len, n.subTypeString())
 
-       if n.Len > 0 && len(data) > DebugCaptureLen {
-               Dissect(buf, dissect, data[DebugCaptureLen:], nil)
+       if n.Len > 0 && len(data) > int(n.DataOffset()) {
+               Dissect(buf, dissect, data[n.DataOffset():], nil)
        }
 }
 
@@ -555,7 +574,7 @@ func (n *DebugCapture) getJSON(data []byte, cpuPrefix string, linkMonitor getter
 
        v := DebugCaptureToVerbose(n, linkMonitor)
        v.CPUPrefix = cpuPrefix
-       v.Summary = GetConnectionSummary(data[DebugCaptureLen:], nil)
+       v.Summary = GetConnectionSummary(data[n.DataOffset():], nil)
 
        ret, err := json.Marshal(v)
        return string(ret), err
index 8c18d70a49cee8b04a1b1eaa124ed7c054c8b144..ba1d905acb2aff8290d3f4706413e76cbad861cc 100644 (file)
@@ -45,6 +45,86 @@ func TestDecodeDebugCapture(t *testing.T) {
        require.Equal(t, input.Arg2, output.Arg2)
 }
 
+func TestDecodeDebugCaptureExt(t *testing.T) {
+       setTmpExtVer := func(extVer uint8, extLen uint) {
+               oldLen, ok := debugCaptureExtensionLengthFromVersion[extVer]
+               if !ok {
+                       t.Cleanup(func() { delete(debugCaptureExtensionLengthFromVersion, extVer) })
+               } else {
+                       t.Cleanup(func() { debugCaptureExtensionLengthFromVersion[extVer] = oldLen })
+               }
+               debugCaptureExtensionLengthFromVersion[extVer] = extLen
+       }
+
+       setTmpExtVer(1, 4)
+       setTmpExtVer(2, 8)
+       setTmpExtVer(3, 16)
+
+       tcs := []struct {
+               name      string
+               dc        DebugCapture
+               extension []uint32
+       }{
+               {
+                       name: "no extension",
+                       dc: DebugCapture{
+                               Version:    1,
+                               ExtVersion: 0,
+                       },
+               },
+               {
+                       name: "extension 1",
+                       dc: DebugCapture{
+                               Version:    1,
+                               ExtVersion: 1,
+                       },
+                       extension: []uint32{
+                               0xC0FFEE,
+                       },
+               },
+               {
+                       name: "extension 2",
+                       dc: DebugCapture{
+                               Version:    1,
+                               ExtVersion: 2,
+                       },
+                       extension: []uint32{
+                               0xC0FFEE,
+                               0xDECAFBAD,
+                       },
+               },
+               {
+                       name: "extension 2",
+                       dc: DebugCapture{
+                               Version:    1,
+                               ExtVersion: 3,
+                       },
+                       extension: []uint32{
+                               0xC0FFEE,
+                               0xDECAFBAD,
+                               0xFA1AFE1,
+                               0xF00DF00D,
+                       },
+               },
+       }
+
+       for _, tc := range tcs {
+               buf := bytes.NewBuffer(nil)
+               err := binary.Write(buf, byteorder.Native, tc.dc)
+               require.NoError(t, err)
+               err = binary.Write(buf, byteorder.Native, tc.extension)
+               require.NoError(t, err)
+               err = binary.Write(buf, byteorder.Native, uint32(0xDEADBEEF))
+               require.NoError(t, err)
+
+               output := &DebugCapture{}
+               err = output.Decode(buf.Bytes())
+               require.NoError(t, err)
+
+               require.Equal(t, uint32(0xDEADBEEF), byteorder.Native.Uint32(buf.Bytes()[output.DataOffset():]))
+       }
+}
+
 func BenchmarkNewDecodeDebugCapture(b *testing.B) {
        input := &DebugCapture{}
        buf := bytes.NewBuffer(nil)
index 87c18117b74e6140085c9f9494d0f4d304e5d2f2..40a21dae5b91b02efc65d48b9aeb1a1075ca6052 100644 (file)
@@ -49,6 +49,19 @@ var (
        }
 )
 
+const DropNotifyExtensionDisabled = 0
+
+var (
+       // Downstream projects should register introduced extensions length so that
+       // the upstream parsing code still works even if the DP events contain
+       // additional fields.
+       dropNotifyExtensionLengthFromVersion = map[uint8]uint{
+               // The DropNotifyExtension is intended for downstream extensions and
+               // should not be used in the upstream project.
+               DropNotifyExtensionDisabled: 0,
+       }
+)
+
 // DropNotify is the message format of a drop notification in the BPF ring buffer
 type DropNotify struct {
        Type       uint8                    `align:"type"`
@@ -178,7 +191,7 @@ func (n *DropNotify) IsVXLAN() bool {
 //
 // Returns zero for invalid or unknown DropNotify messages.
 func (n *DropNotify) DataOffset() uint {
-       return dropNotifyLengthFromVersion[n.Version]
+       return dropNotifyLengthFromVersion[n.Version] + dropNotifyExtensionLengthFromVersion[n.ExtVersion]
 }
 
 // DumpInfo prints a summary of the drop messages.
index 0c2c5bedb494685822d12903634bf7faf944b782..da5a101291c51899a5536a6e01e9c585086aa188 100644 (file)
@@ -206,6 +206,86 @@ func TestDecodeDropNotify(t *testing.T) {
        }
 }
 
+func TestDecodeDropNotifyExtension(t *testing.T) {
+       setTmpExtVer := func(extVer uint8, extLen uint) {
+               oldLen, ok := dropNotifyExtensionLengthFromVersion[extVer]
+               if !ok {
+                       t.Cleanup(func() { delete(dropNotifyExtensionLengthFromVersion, extVer) })
+               } else {
+                       t.Cleanup(func() { dropNotifyExtensionLengthFromVersion[extVer] = oldLen })
+               }
+               dropNotifyExtensionLengthFromVersion[extVer] = extLen
+       }
+
+       setTmpExtVer(1, 4)
+       setTmpExtVer(2, 8)
+       setTmpExtVer(3, 16)
+
+       tcs := []struct {
+               name      string
+               dn        DropNotify
+               extension []uint32
+       }{
+               {
+                       name: "no extension",
+                       dn: DropNotify{
+                               Version:    3,
+                               ExtVersion: 0,
+                       },
+               },
+               {
+                       name: "extension 1",
+                       dn: DropNotify{
+                               Version:    3,
+                               ExtVersion: 1,
+                       },
+                       extension: []uint32{
+                               0xC0FFEE,
+                       },
+               },
+               {
+                       name: "extension 2",
+                       dn: DropNotify{
+                               Version:    3,
+                               ExtVersion: 2,
+                       },
+                       extension: []uint32{
+                               0xC0FFEE,
+                               0xDECAFBAD,
+                       },
+               },
+               {
+                       name: "extension 2",
+                       dn: DropNotify{
+                               Version:    3,
+                               ExtVersion: 3,
+                       },
+                       extension: []uint32{
+                               0xC0FFEE,
+                               0xDECAFBAD,
+                               0xFA1AFE1,
+                               0xF00DF00D,
+                       },
+               },
+       }
+
+       for _, tc := range tcs {
+               buf := bytes.NewBuffer(nil)
+               err := binary.Write(buf, byteorder.Native, tc.dn)
+               require.NoError(t, err)
+               err = binary.Write(buf, byteorder.Native, tc.extension)
+               require.NoError(t, err)
+               err = binary.Write(buf, byteorder.Native, uint32(0xDEADBEEF))
+               require.NoError(t, err)
+
+               output := &DropNotify{}
+               err = output.Decode(buf.Bytes())
+               require.NoError(t, err)
+
+               require.Equal(t, uint32(0xDEADBEEF), byteorder.Native.Uint32(buf.Bytes()[output.DataOffset():]))
+       }
+}
+
 func BenchmarkNewDropNotifyV1_Decode(b *testing.B) {
        input := DropNotify{}
        buf := bytes.NewBuffer(nil)
index 9d7e7ad796db9ea3f446c5e97580b08d6ac67e2f..28877baa6a556226c984d5939552f59402b4692f 100644 (file)
@@ -46,6 +46,19 @@ const (
        PolicyVerdictNotifyFlagMatchTypeBitOffset = 3
 )
 
+const PolicyVerdictExtensionDisabled = 0
+
+var (
+       // Downstream projects should register introduced extensions length so that
+       // the upstream parsing code still works even if the DP events contain
+       // additional fields.
+       policyVerdictExtensionLengthFromVersion = map[uint8]uint{
+               // The PolicyVerdictExtension is intended for downstream extensions and
+               // should not be used in the upstream project.
+               PolicyVerdictExtensionDisabled: 0,
+       }
+)
+
 // PolicyVerdictNotify is the message format of a policy verdict notification in the bpf ring buffer
 type PolicyVerdictNotify struct {
        Type        uint8                    `align:"type"`
@@ -137,6 +150,12 @@ func (n *PolicyVerdictNotify) IsTrafficAudited() bool {
        return (n.Flags&PolicyVerdictNotifyFlagIsAudited > 0)
 }
 
+// DataOffset returns the offset from the beginning of PolicyVerdictNotify where the
+// notification data begins.
+func (n *PolicyVerdictNotify) DataOffset() uint {
+       return PolicyVerdictNotifyLen + policyVerdictExtensionLengthFromVersion[n.ExtVersion]
+}
+
 // GetPolicyActionString returns the action string corresponding to the action
 func GetPolicyActionString(verdict int32, audit bool) string {
        if audit {
@@ -172,5 +191,5 @@ func (n *PolicyVerdictNotify) DumpInfo(buf *bufio.Writer, data []byte, numeric a
        fmt.Fprintf(buf, ", proto %d, %s, action %s, auth: %s, match %s, %s\n", n.Proto, dir,
                GetPolicyActionString(n.Verdict, n.IsTrafficAudited()),
                n.GetAuthType(), n.GetPolicyMatchType(),
-               GetConnectionSummary(data[PolicyVerdictNotifyLen:], &decodeOpts{IsL3Device: n.IsTrafficL3Device(), IsIPv6: n.IsTrafficIPv6()}))
+               GetConnectionSummary(data[n.DataOffset():], &decodeOpts{IsL3Device: n.IsTrafficL3Device(), IsIPv6: n.IsTrafficIPv6()}))
 }
index 39a69ec44edbea02c18a9bab5b8cdfde97b572b0..a731e4b865077d279c9c2708f161d7532f3030a1 100644 (file)
@@ -58,6 +58,86 @@ func TestDecodePolicyVerdicyNotify(t *testing.T) {
        require.Equal(t, input.Cookie, output.Cookie)
 }
 
+func TestDecodePolicyVerdictNotifyExtension(t *testing.T) {
+       setTmpExtVer := func(extVer uint8, extLen uint) {
+               oldLen, ok := policyVerdictExtensionLengthFromVersion[extVer]
+               if !ok {
+                       t.Cleanup(func() { delete(policyVerdictExtensionLengthFromVersion, extVer) })
+               } else {
+                       t.Cleanup(func() { policyVerdictExtensionLengthFromVersion[extVer] = oldLen })
+               }
+               policyVerdictExtensionLengthFromVersion[extVer] = extLen
+       }
+
+       setTmpExtVer(1, 4)
+       setTmpExtVer(2, 8)
+       setTmpExtVer(3, 16)
+
+       tcs := []struct {
+               name      string
+               pvn       PolicyVerdictNotify
+               extension []uint32
+       }{
+               {
+                       name: "no extension",
+                       pvn: PolicyVerdictNotify{
+                               Version:    1,
+                               ExtVersion: 0,
+                       },
+               },
+               {
+                       name: "extension 1",
+                       pvn: PolicyVerdictNotify{
+                               Version:    1,
+                               ExtVersion: 1,
+                       },
+                       extension: []uint32{
+                               0xC0FFEE,
+                       },
+               },
+               {
+                       name: "extension 2",
+                       pvn: PolicyVerdictNotify{
+                               Version:    3,
+                               ExtVersion: 2,
+                       },
+                       extension: []uint32{
+                               0xC0FFEE,
+                               0xDECAFBAD,
+                       },
+               },
+               {
+                       name: "extension 2",
+                       pvn: PolicyVerdictNotify{
+                               Version:    3,
+                               ExtVersion: 3,
+                       },
+                       extension: []uint32{
+                               0xC0FFEE,
+                               0xDECAFBAD,
+                               0xFA1AFE1,
+                               0xF00DF00D,
+                       },
+               },
+       }
+
+       for _, tc := range tcs {
+               buf := bytes.NewBuffer(nil)
+               err := binary.Write(buf, byteorder.Native, tc.pvn)
+               require.NoError(t, err)
+               err = binary.Write(buf, byteorder.Native, tc.extension)
+               require.NoError(t, err)
+               err = binary.Write(buf, byteorder.Native, uint32(0xDEADBEEF))
+               require.NoError(t, err)
+
+               output := &PolicyVerdictNotify{}
+               err = output.Decode(buf.Bytes())
+               require.NoError(t, err)
+
+               require.Equal(t, uint32(0xDEADBEEF), byteorder.Native.Uint32(buf.Bytes()[output.DataOffset():]))
+       }
+}
+
 func BenchmarkNewDecodePolicyVerdictNotify(b *testing.B) {
        input := &PolicyVerdictNotify{}
        buf := bytes.NewBuffer(nil)
index f06e9ed89db94dadab4c76995aab72757c1e531c..62d7dae42a1e15ee9a4b5afcdeac8bea41adf8f9 100644 (file)
@@ -189,6 +189,19 @@ var (
        }
 )
 
+const TraceNotifyExtensionDisabled = 0
+
+var (
+       // Downstream projects should register introduced extensions length so that
+       // the upstream parsing code still works even if the DP events contain
+       // additional fields.
+       traceNotifyExtensionLengthFromVersion = map[uint8]uint{
+               // The TraceNotifyExtension is intended for downstream extensions and
+               // should not be used in the upstream project.
+               TraceNotifyExtensionDisabled: 0,
+       }
+)
+
 /* Reasons for forwarding a packet, keep in sync with api/v1/flow/flow.proto */
 const (
        TraceReasonPolicy = iota
@@ -316,7 +329,7 @@ func (n *TraceNotify) OriginalIP() net.IP {
 //
 // Returns zero for invalid or unknown TraceNotify messages.
 func (n *TraceNotify) DataOffset() uint {
-       return traceNotifyLength[n.Version]
+       return traceNotifyLength[n.Version] + traceNotifyExtensionLengthFromVersion[n.ExtVersion]
 }
 
 // DumpInfo prints a summary of the trace messages.
index 855843e300426d53421d3dd4d97bc0965b09d948..b5eff5ca68c68943462d9b57843baabacf1bd788 100644 (file)
@@ -66,6 +66,86 @@ func TestDecodeTraceNotify(t *testing.T) {
        require.Equal(t, in.IPTraceID, out.IPTraceID)
 }
 
+func TestDecodeTraceNotifyExtension(t *testing.T) {
+       setTmpExtVer := func(extVer uint8, extLen uint) {
+               oldLen, ok := traceNotifyExtensionLengthFromVersion[extVer]
+               if !ok {
+                       t.Cleanup(func() { delete(traceNotifyExtensionLengthFromVersion, extVer) })
+               } else {
+                       t.Cleanup(func() { traceNotifyExtensionLengthFromVersion[extVer] = oldLen })
+               }
+               traceNotifyExtensionLengthFromVersion[extVer] = extLen
+       }
+
+       setTmpExtVer(1, 4)
+       setTmpExtVer(2, 8)
+       setTmpExtVer(3, 16)
+
+       tcs := []struct {
+               name      string
+               tn        TraceNotify
+               extension []uint32
+       }{
+               {
+                       name: "no extension",
+                       tn: TraceNotify{
+                               Version:    TraceNotifyVersion2,
+                               ExtVersion: 0,
+                       },
+               },
+               {
+                       name: "extension 1",
+                       tn: TraceNotify{
+                               Version:    TraceNotifyVersion2,
+                               ExtVersion: 1,
+                       },
+                       extension: []uint32{
+                               0xC0FFEE,
+                       },
+               },
+               {
+                       name: "extension 2",
+                       tn: TraceNotify{
+                               Version:    TraceNotifyVersion2,
+                               ExtVersion: 2,
+                       },
+                       extension: []uint32{
+                               0xC0FFEE,
+                               0xDECAFBAD,
+                       },
+               },
+               {
+                       name: "extension 2",
+                       tn: TraceNotify{
+                               Version:    TraceNotifyVersion2,
+                               ExtVersion: 3,
+                       },
+                       extension: []uint32{
+                               0xC0FFEE,
+                               0xDECAFBAD,
+                               0xFA1AFE1,
+                               0xF00DF00D,
+                       },
+               },
+       }
+
+       for _, tc := range tcs {
+               buf := bytes.NewBuffer(nil)
+               err := binary.Write(buf, byteorder.Native, tc.tn)
+               require.NoError(t, err)
+               err = binary.Write(buf, byteorder.Native, tc.extension)
+               require.NoError(t, err)
+               err = binary.Write(buf, byteorder.Native, uint32(0xDEADBEEF))
+               require.NoError(t, err)
+
+               output := &TraceNotify{}
+               err = output.Decode(buf.Bytes())
+               require.NoError(t, err)
+
+               require.Equal(t, uint32(0xDEADBEEF), byteorder.Native.Uint32(buf.Bytes()[output.DataOffset():]))
+       }
+}
+
 func TestDecodeTraceNotifyErrors(t *testing.T) {
        tn := TraceNotify{}
        err := tn.Decode([]byte{})