]> git.feebdaed.xyz Git - 0xmirror/ebpf.git/commitdiff
ringbuf: Windows support
authorLorenz Bauer <lmb@isovalent.com>
Fri, 15 Aug 2025 14:42:20 +0000 (15:42 +0100)
committerLorenz Bauer <lmb@users.noreply.github.com>
Mon, 18 Aug 2025 09:26:26 +0000 (10:26 +0100)
Signed-off-by: Lorenz Bauer <lmb@isovalent.com>
internal/efw/map.go [new file with mode: 0644]
ringbuf/doc.go
ringbuf/helper_other_test.go [new file with mode: 0644]
ringbuf/helper_windows_test.go [new file with mode: 0644]
ringbuf/reader.go
ringbuf/reader_other.go [new file with mode: 0644]
ringbuf/reader_test.go
ringbuf/reader_windows.go [new file with mode: 0644]
ringbuf/ring.go
ringbuf/ring_other.go [new file with mode: 0644]
ringbuf/ring_windows.go [new file with mode: 0644]

diff --git a/internal/efw/map.go b/internal/efw/map.go
new file mode 100644 (file)
index 0000000..82f510f
--- /dev/null
@@ -0,0 +1,109 @@
+//go:build windows
+
+package efw
+
+import (
+       "runtime"
+       "syscall"
+       "unsafe"
+
+       "golang.org/x/sys/windows"
+)
+
+/*
+ebpf_ring_buffer_map_map_buffer(
+
+       fd_t map_fd,
+       _Outptr_result_maybenull_ void** consumer,
+       _Outptr_result_maybenull_ const void** producer,
+       _Outptr_result_buffer_maybenull_(*data_size) const uint8_t** data,
+       _Out_ size_t* data_size) EBPF_NO_EXCEPT;
+*/
+var ebpfRingBufferMapMapBufferProc = newProc("ebpf_ring_buffer_map_map_buffer")
+
+func EbpfRingBufferMapMapBuffer(mapFd int) (consumer, producer, data *uint8, dataLen Size, _ error) {
+       addr, err := ebpfRingBufferMapMapBufferProc.Find()
+       if err != nil {
+               return nil, nil, nil, 0, err
+       }
+
+       err = errorResult(syscall.SyscallN(addr,
+               uintptr(mapFd),
+               uintptr(unsafe.Pointer(&consumer)),
+               uintptr(unsafe.Pointer(&producer)),
+               uintptr(unsafe.Pointer(&data)),
+               uintptr(unsafe.Pointer(&dataLen)),
+       ))
+       if err != nil {
+               return nil, nil, nil, 0, err
+       }
+
+       return consumer, producer, data, dataLen, nil
+}
+
+/*
+ebpf_ring_buffer_map_unmap_buffer(
+
+       fd_t map_fd, _In_ void* consumer, _In_ const void* producer, _In_ const void* data) EBPF_NO_EXCEPT;
+*/
+var ebpfRingBufferMapUnmapBufferProc = newProc("ebpf_ring_buffer_map_unmap_buffer")
+
+func EbpfRingBufferMapUnmapBuffer(mapFd int, consumer, producer, data *uint8) error {
+       addr, err := ebpfRingBufferMapUnmapBufferProc.Find()
+       if err != nil {
+               return err
+       }
+
+       return errorResult(syscall.SyscallN(addr,
+               uintptr(mapFd),
+               uintptr(unsafe.Pointer(consumer)),
+               uintptr(unsafe.Pointer(producer)),
+               uintptr(unsafe.Pointer(data)),
+       ))
+}
+
+/*
+ebpf_result_t ebpf_map_set_wait_handle(
+
+       fd_t map_fd,
+       uint64_t index,
+       ebpf_handle_t handle)
+*/
+var ebpfMapSetWaitHandleProc = newProc("ebpf_map_set_wait_handle")
+
+func EbpfMapSetWaitHandle(mapFd int, index uint64, handle windows.Handle) error {
+       addr, err := ebpfMapSetWaitHandleProc.Find()
+       if err != nil {
+               return err
+       }
+
+       return errorResult(syscall.SyscallN(addr,
+               uintptr(mapFd),
+               uintptr(index),
+               uintptr(handle),
+       ))
+}
+
+/*
+ebpf_result_t ebpf_ring_buffer_map_write(
+
+       fd_t ring_buffer_map_fd,
+       const void* data,
+       size_t data_length)
+*/
+var ebpfRingBufferMapWriteProc = newProc("ebpf_ring_buffer_map_write")
+
+func EbpfRingBufferMapWrite(ringBufferMapFd int, data []byte) error {
+       addr, err := ebpfRingBufferMapWriteProc.Find()
+       if err != nil {
+               return err
+       }
+
+       err = errorResult(syscall.SyscallN(addr,
+               uintptr(ringBufferMapFd),
+               uintptr(unsafe.Pointer(&data[0])),
+               uintptr(len(data)),
+       ))
+       runtime.KeepAlive(data)
+       return err
+}
index 9e450121878057ab0bea9d79881c79064cd833cb..eb9617e63a2bbe861424c46283763bb68b8f7360 100644 (file)
@@ -1,4 +1,4 @@
-// Package ringbuf allows interacting with Linux BPF ring buffer.
+// Package ringbuf allows interacting with the BPF ring buffer.
 //
 // BPF allows submitting custom events to a BPF ring buffer map set up
 // by userspace. This is very useful to push things like packet samples
diff --git a/ringbuf/helper_other_test.go b/ringbuf/helper_other_test.go
new file mode 100644 (file)
index 0000000..6068197
--- /dev/null
@@ -0,0 +1,92 @@
+//go:build !windows
+
+package ringbuf
+
+import (
+       "testing"
+
+       "github.com/go-quicktest/qt"
+
+       "github.com/cilium/ebpf"
+       "github.com/cilium/ebpf/asm"
+)
+
+func mustOutputSamplesProg(tb testing.TB, sampleMessages ...sampleMessage) (*ebpf.Program, *ebpf.Map) {
+       tb.Helper()
+
+       events, err := ebpf.NewMap(&ebpf.MapSpec{
+               Type:       ebpf.RingBuf,
+               MaxEntries: 4096,
+       })
+       qt.Assert(tb, qt.IsNil(err))
+       tb.Cleanup(func() {
+               events.Close()
+       })
+
+       var maxSampleSize int
+       for _, sampleMessage := range sampleMessages {
+               if sampleMessage.size > maxSampleSize {
+                       maxSampleSize = sampleMessage.size
+               }
+       }
+
+       insns := asm.Instructions{
+               asm.LoadImm(asm.R0, 0x0102030404030201, asm.DWord),
+               asm.Mov.Reg(asm.R9, asm.R1),
+       }
+
+       bufDwords := (maxSampleSize / 8) + 1
+       for i := range bufDwords {
+               insns = append(insns,
+                       asm.StoreMem(asm.RFP, int16(i+1)*-8, asm.R0, asm.DWord),
+               )
+       }
+
+       for _, sampleMessage := range sampleMessages {
+               insns = append(insns,
+                       asm.LoadMapPtr(asm.R1, events.FD()),
+                       asm.Mov.Imm(asm.R2, int32(sampleMessage.size)),
+                       asm.Mov.Imm(asm.R3, int32(0)),
+                       asm.FnRingbufReserve.Call(),
+                       asm.JEq.Imm(asm.R0, 0, "exit"),
+                       asm.Mov.Reg(asm.R5, asm.R0),
+               )
+               for i := range sampleMessage.size {
+                       insns = append(insns,
+                               asm.LoadMem(asm.R4, asm.RFP, int16(i+1)*-1, asm.Byte),
+                               asm.StoreMem(asm.R5, int16(i), asm.R4, asm.Byte),
+                       )
+               }
+
+               if sampleMessage.discard {
+                       insns = append(insns,
+                               asm.Mov.Reg(asm.R1, asm.R5),
+                               asm.Mov.Imm(asm.R2, sampleMessage.flags),
+                               asm.FnRingbufDiscard.Call(),
+                       )
+               } else {
+                       insns = append(insns,
+                               asm.Mov.Reg(asm.R1, asm.R5),
+                               asm.Mov.Imm(asm.R2, sampleMessage.flags),
+                               asm.FnRingbufSubmit.Call(),
+                       )
+               }
+       }
+
+       insns = append(insns,
+               asm.Mov.Imm(asm.R0, int32(0)).WithSymbol("exit"),
+               asm.Return(),
+       )
+
+       prog, err := ebpf.NewProgram(&ebpf.ProgramSpec{
+               License:      "MIT",
+               Type:         ebpf.XDP,
+               Instructions: insns,
+       })
+       qt.Assert(tb, qt.IsNil(err))
+       tb.Cleanup(func() {
+               prog.Close()
+       })
+
+       return prog, events
+}
diff --git a/ringbuf/helper_windows_test.go b/ringbuf/helper_windows_test.go
new file mode 100644 (file)
index 0000000..18338d2
--- /dev/null
@@ -0,0 +1,75 @@
+package ringbuf
+
+import (
+       "testing"
+
+       "github.com/go-quicktest/qt"
+
+       "github.com/cilium/ebpf"
+       "github.com/cilium/ebpf/asm"
+)
+
+func mustOutputSamplesProg(tb testing.TB, sampleMessages ...sampleMessage) (*ebpf.Program, *ebpf.Map) {
+       tb.Helper()
+
+       events, err := ebpf.NewMap(&ebpf.MapSpec{
+               Type:       ebpf.WindowsRingBuf,
+               MaxEntries: 4096,
+       })
+       qt.Assert(tb, qt.IsNil(err))
+       tb.Cleanup(func() {
+               events.Close()
+       })
+
+       var maxSampleSize int
+       for _, sampleMessage := range sampleMessages {
+               if sampleMessage.size > maxSampleSize {
+                       maxSampleSize = sampleMessage.size
+               }
+       }
+
+       insns := asm.Instructions{
+               asm.LoadImm(asm.R0, 0x0102030404030201, asm.DWord),
+               asm.Mov.Reg(asm.R9, asm.R1),
+       }
+
+       bufDwords := (maxSampleSize / 8) + 1
+       for i := range bufDwords {
+               insns = append(insns,
+                       asm.StoreMem(asm.RFP, int16(i+1)*-8, asm.R0, asm.DWord),
+               )
+       }
+
+       for _, sampleMessage := range sampleMessages {
+               if sampleMessage.discard {
+                       tb.Skip("discard is not supported on Windows")
+               }
+
+               insns = append(insns,
+                       asm.LoadMapPtr(asm.R1, events.FD()),
+                       asm.Mov.Reg(asm.R2, asm.RFP),
+                       asm.Add.Imm(asm.R2, -int32(8*bufDwords)),
+                       asm.Mov.Imm(asm.R3, int32(sampleMessage.size)),
+                       asm.Mov.Imm(asm.R4, sampleMessage.flags),
+                       asm.WindowsFnRingbufOutput.Call(),
+                       asm.JNE.Imm(asm.R0, 0, "exit"),
+               )
+       }
+
+       insns = append(insns,
+               asm.Mov.Imm(asm.R0, int32(0)),
+               asm.Return().WithSymbol("exit"),
+       )
+
+       prog, err := ebpf.NewProgram(&ebpf.ProgramSpec{
+               License:      "MIT",
+               Type:         ebpf.WindowsXDPTest,
+               Instructions: insns,
+       })
+       qt.Assert(tb, qt.IsNil(err))
+       tb.Cleanup(func() {
+               prog.Close()
+       })
+
+       return prog, events
+}
index 133772256fc0bd75a6ca0ec79751b4a962583423..19470342ff24ce7dfb44ee8767841b4fc102a138 100644 (file)
@@ -1,5 +1,3 @@
-//go:build !windows
-
 package ringbuf
 
 import (
@@ -11,16 +9,14 @@ import (
        "unsafe"
 
        "github.com/cilium/ebpf"
-       "github.com/cilium/ebpf/internal/epoll"
+       "github.com/cilium/ebpf/internal/platform"
        "github.com/cilium/ebpf/internal/sys"
-       "github.com/cilium/ebpf/internal/unix"
 )
 
 var (
-       ErrClosed  = os.ErrClosed
-       ErrFlushed = epoll.ErrFlushed
-       errEOR     = errors.New("end of ring")
-       errBusy    = errors.New("sample not committed yet")
+       ErrClosed = os.ErrClosed
+       errEOR    = errors.New("end of ring")
+       errBusy   = errors.New("sample not committed yet")
 )
 
 // ringbufHeader from 'struct bpf_ringbuf_hdr' in kernel/bpf/ringbuf.c
@@ -53,21 +49,20 @@ type Record struct {
 // Reader allows reading bpf_ringbuf_output
 // from user space.
 type Reader struct {
-       poller *epoll.Poller
+       poller *poller
 
        // mu protects read/write access to the Reader structure
-       mu          sync.Mutex
-       ring        *ringbufEventRing
-       epollEvents []unix.EpollEvent
-       haveData    bool
-       deadline    time.Time
-       bufferSize  int
-       pendingErr  error
+       mu         sync.Mutex
+       ring       *ringbufEventRing
+       haveData   bool
+       deadline   time.Time
+       bufferSize int
+       pendingErr error
 }
 
 // NewReader creates a new BPF ringbuf reader.
 func NewReader(ringbufMap *ebpf.Map) (*Reader, error) {
-       if ringbufMap.Type() != ebpf.RingBuf {
+       if ringbufMap.Type() != ebpf.RingBuf && ringbufMap.Type() != ebpf.WindowsRingBuf {
                return nil, fmt.Errorf("invalid Map type: %s", ringbufMap.Type())
        }
 
@@ -76,16 +71,11 @@ func NewReader(ringbufMap *ebpf.Map) (*Reader, error) {
                return nil, fmt.Errorf("ringbuffer map size %d is zero or not a power of two", maxEntries)
        }
 
-       poller, err := epoll.New()
+       poller, err := newPoller(ringbufMap.FD())
        if err != nil {
                return nil, err
        }
 
-       if err := poller.Add(ringbufMap.FD(), 0); err != nil {
-               poller.Close()
-               return nil, err
-       }
-
        ring, err := newRingBufEventRing(ringbufMap.FD(), maxEntries)
        if err != nil {
                poller.Close()
@@ -93,10 +83,13 @@ func NewReader(ringbufMap *ebpf.Map) (*Reader, error) {
        }
 
        return &Reader{
-               poller:      poller,
-               ring:        ring,
-               epollEvents: make([]unix.EpollEvent, 1),
-               bufferSize:  ring.size(),
+               poller:     poller,
+               ring:       ring,
+               bufferSize: ring.size(),
+               // On Windows, the wait handle is only set when the reader is created,
+               // so we miss any wakeups that happened before.
+               // Do an opportunistic read to get any pending samples.
+               haveData: platform.IsWindows,
        }, nil
 }
 
@@ -115,12 +108,7 @@ func (r *Reader) Close() error {
        r.mu.Lock()
        defer r.mu.Unlock()
 
-       if r.ring != nil {
-               r.ring.Close()
-               r.ring = nil
-       }
-
-       return nil
+       return r.ring.Close()
 }
 
 // SetDeadline controls how long Read and ReadInto will block waiting for samples.
@@ -163,7 +151,7 @@ func (r *Reader) ReadInto(rec *Record) error {
                                return pe
                        }
 
-                       _, err := r.poller.Wait(r.epollEvents[:cap(r.epollEvents)], r.deadline)
+                       err := r.poller.Wait(r.deadline)
                        if errors.Is(err, os.ErrDeadlineExceeded) || errors.Is(err, ErrFlushed) {
                                // Ignoring this for reading a valid entry after timeout or flush.
                                // This can occur if the producer submitted to the ring buffer
diff --git a/ringbuf/reader_other.go b/ringbuf/reader_other.go
new file mode 100644 (file)
index 0000000..d5617e1
--- /dev/null
@@ -0,0 +1,41 @@
+//go:build !windows
+
+package ringbuf
+
+import (
+       "time"
+
+       "github.com/cilium/ebpf/internal/epoll"
+       "github.com/cilium/ebpf/internal/unix"
+)
+
+var ErrFlushed = epoll.ErrFlushed
+
+type poller struct {
+       *epoll.Poller
+       events []unix.EpollEvent
+}
+
+func newPoller(fd int) (*poller, error) {
+       ep, err := epoll.New()
+       if err != nil {
+               return nil, err
+       }
+
+       if err := ep.Add(fd, 0); err != nil {
+               ep.Close()
+               return nil, err
+       }
+
+       return &poller{
+               Poller: ep,
+               events: make([]unix.EpollEvent, 1),
+       }, nil
+}
+
+// Returns [os.ErrDeadlineExceeded] if a deadline was set and no wakeup was received.
+// Returns [ErrFlushed] if the ring buffer was flushed manually.
+func (p *poller) Wait(deadline time.Time) error {
+       _, err := p.Poller.Wait(p.events, deadline)
+       return err
+}
index 20e21e8ad029334d570cc0320e8ab97f232a8675..3babce1a14a886dce80a371ddcc824cb5c0164e6 100644 (file)
@@ -1,5 +1,3 @@
-//go:build !windows
-
 package ringbuf
 
 import (
@@ -12,8 +10,6 @@ import (
        "github.com/go-quicktest/qt"
        "github.com/google/go-cmp/cmp"
 
-       "github.com/cilium/ebpf"
-       "github.com/cilium/ebpf/asm"
        "github.com/cilium/ebpf/internal"
        "github.com/cilium/ebpf/internal/sys"
        "github.com/cilium/ebpf/internal/testutils"
@@ -21,8 +17,9 @@ import (
 )
 
 type sampleMessage struct {
-       size  int
-       flags int32
+       size    int
+       flags   int32
+       discard bool
 }
 
 func TestMain(m *testing.M) {
@@ -46,7 +43,7 @@ func TestRingbufReader(t *testing.T) {
                },
                {
                        name:     "send three short samples, the second is discarded",
-                       messages: []sampleMessage{{size: 5}, {size: 10}, {size: 15}},
+                       messages: []sampleMessage{{size: 5}, {size: 10, discard: true}, {size: 15}},
                        want: map[int][]byte{
                                5:  {1, 2, 3, 4, 4},
                                15: {1, 2, 3, 4, 4, 3, 2, 1, 1, 2, 3, 4, 4, 3, 2},
@@ -54,7 +51,7 @@ func TestRingbufReader(t *testing.T) {
                },
                {
                        name:     "send five samples, every even is discarded",
-                       messages: []sampleMessage{{size: 5}, {size: 10}, {size: 15}, {size: 20}, {size: 25}},
+                       messages: []sampleMessage{{size: 5}, {size: 10, discard: true}, {size: 15}, {size: 20, discard: true}, {size: 25}},
                        want: map[int][]byte{
                                5:  {1, 2, 3, 4, 4},
                                15: {1, 2, 3, 4, 4, 3, 2, 1, 1, 2, 3, 4, 4, 3, 2},
@@ -120,100 +117,6 @@ func TestRingbufReader(t *testing.T) {
        }
 }
 
-func outputSamplesProg(sampleMessages ...sampleMessage) (*ebpf.Program, *ebpf.Map, error) {
-       events, err := ebpf.NewMap(&ebpf.MapSpec{
-               Type:       ebpf.RingBuf,
-               MaxEntries: 4096,
-       })
-       if err != nil {
-               return nil, nil, err
-       }
-
-       var maxSampleSize int
-       for _, sampleMessage := range sampleMessages {
-               if sampleMessage.size > maxSampleSize {
-                       maxSampleSize = sampleMessage.size
-               }
-       }
-
-       insns := asm.Instructions{
-               asm.LoadImm(asm.R0, 0x0102030404030201, asm.DWord),
-               asm.Mov.Reg(asm.R9, asm.R1),
-       }
-
-       bufDwords := (maxSampleSize / 8) + 1
-       for i := 0; i < bufDwords; i++ {
-               insns = append(insns,
-                       asm.StoreMem(asm.RFP, int16(i+1)*-8, asm.R0, asm.DWord),
-               )
-       }
-
-       for sampleIdx, sampleMessage := range sampleMessages {
-               insns = append(insns,
-                       asm.LoadMapPtr(asm.R1, events.FD()),
-                       asm.Mov.Imm(asm.R2, int32(sampleMessage.size)),
-                       asm.Mov.Imm(asm.R3, int32(0)),
-                       asm.FnRingbufReserve.Call(),
-                       asm.JEq.Imm(asm.R0, 0, "exit"),
-                       asm.Mov.Reg(asm.R5, asm.R0),
-               )
-               for i := 0; i < sampleMessage.size; i++ {
-                       insns = append(insns,
-                               asm.LoadMem(asm.R4, asm.RFP, int16(i+1)*-1, asm.Byte),
-                               asm.StoreMem(asm.R5, int16(i), asm.R4, asm.Byte),
-                       )
-               }
-
-               // discard every even sample
-               if sampleIdx&1 != 0 {
-                       insns = append(insns,
-                               asm.Mov.Reg(asm.R1, asm.R5),
-                               asm.Mov.Imm(asm.R2, sampleMessage.flags),
-                               asm.FnRingbufDiscard.Call(),
-                       )
-               } else {
-                       insns = append(insns,
-                               asm.Mov.Reg(asm.R1, asm.R5),
-                               asm.Mov.Imm(asm.R2, sampleMessage.flags),
-                               asm.FnRingbufSubmit.Call(),
-                       )
-               }
-       }
-
-       insns = append(insns,
-               asm.Mov.Imm(asm.R0, int32(0)).WithSymbol("exit"),
-               asm.Return(),
-       )
-
-       prog, err := ebpf.NewProgram(&ebpf.ProgramSpec{
-               License:      "MIT",
-               Type:         ebpf.XDP,
-               Instructions: insns,
-       })
-       if err != nil {
-               events.Close()
-               return nil, nil, err
-       }
-
-       return prog, events, nil
-}
-
-func mustOutputSamplesProg(tb testing.TB, sampleMessages ...sampleMessage) (*ebpf.Program, *ebpf.Map) {
-       tb.Helper()
-
-       prog, events, err := outputSamplesProg(sampleMessages...)
-       if err != nil {
-               tb.Fatal(err)
-       }
-
-       tb.Cleanup(func() {
-               prog.Close()
-               events.Close()
-       })
-
-       return prog, events
-}
-
 func TestReaderBlocking(t *testing.T) {
        testutils.SkipOnOldKernel(t, "5.8", "BPF ring buffer")
 
@@ -279,8 +182,8 @@ func TestReaderNoWakeup(t *testing.T) {
 
        prog, events := mustOutputSamplesProg(t,
                sampleMessage{size: 5, flags: sys.BPF_RB_NO_WAKEUP}, // Read after timeout
-               sampleMessage{size: 6, flags: sys.BPF_RB_NO_WAKEUP}, // Discard
-               sampleMessage{size: 7, flags: sys.BPF_RB_NO_WAKEUP}) // Read won't block
+               sampleMessage{size: 7, flags: sys.BPF_RB_NO_WAKEUP}, // Read won't block
+       )
 
        rd, err := NewReader(events)
        if err != nil {
@@ -296,7 +199,7 @@ func TestReaderNoWakeup(t *testing.T) {
                t.Fatal(err)
        }
 
-       qt.Assert(t, qt.Equals(rd.AvailableBytes(), 3*16))
+       qt.Assert(t, qt.Equals(rd.AvailableBytes(), 2*16))
 
        if errno := syscall.Errno(-int32(ret)); errno != 0 {
                t.Fatal("Expected 0 as return value, got", errno)
@@ -312,7 +215,7 @@ func TestReaderNoWakeup(t *testing.T) {
                t.Errorf("Expected to read 5 bytes but got %d", len(record.RawSample))
        }
 
-       qt.Assert(t, qt.Equals(rd.AvailableBytes(), 2*16))
+       qt.Assert(t, qt.Equals(rd.AvailableBytes(), 1*16))
 
        record, err = rd.Read()
 
@@ -336,8 +239,8 @@ func TestReaderFlushPendingEvents(t *testing.T) {
 
        prog, events := mustOutputSamplesProg(t,
                sampleMessage{size: 5, flags: sys.BPF_RB_NO_WAKEUP}, // Read after Flush
-               sampleMessage{size: 6, flags: sys.BPF_RB_NO_WAKEUP}, // Discard
-               sampleMessage{size: 7, flags: sys.BPF_RB_NO_WAKEUP}) // Read won't block
+               sampleMessage{size: 7, flags: sys.BPF_RB_NO_WAKEUP}, // Read won't block
+       )
 
        rd, err := NewReader(events)
        if err != nil {
diff --git a/ringbuf/reader_windows.go b/ringbuf/reader_windows.go
new file mode 100644 (file)
index 0000000..f90bfe3
--- /dev/null
@@ -0,0 +1,105 @@
+package ringbuf
+
+import (
+       "errors"
+       "fmt"
+       "os"
+       "sync/atomic"
+       "time"
+
+       "golang.org/x/sys/windows"
+
+       "github.com/cilium/ebpf/internal"
+       "github.com/cilium/ebpf/internal/efw"
+)
+
+type poller struct {
+       closed      atomic.Bool
+       handle      windows.Handle
+       flushHandle windows.Handle
+       handles     []windows.Handle
+}
+
+var ErrFlushed = errors.New("ring buffer flushed")
+
+func newPoller(fd int) (*poller, error) {
+       handle, err := windows.CreateEvent(nil, 0, 0, nil)
+       if err != nil {
+               return nil, err
+       }
+
+       flushHandle, err := windows.CreateEvent(nil, 0, 0, nil)
+       if err != nil {
+               windows.CloseHandle(handle)
+               return nil, err
+       }
+
+       if err := efw.EbpfMapSetWaitHandle(fd, 0, handle); err != nil {
+               windows.CloseHandle(handle)
+               windows.CloseHandle(flushHandle)
+               return nil, err
+       }
+
+       return &poller{
+               handle:      handle,
+               flushHandle: flushHandle,
+               handles:     []windows.Handle{handle, flushHandle},
+       }, nil
+}
+
+// Returns [os.ErrDeadlineExceeded] if a deadline was set and no wakeup was received.
+// Returns [ErrFlushed] if the ring buffer was flushed manually.
+// Returns [os.ErrClosed] if the poller was closed.
+func (p *poller) Wait(deadline time.Time) error {
+       if p.closed.Load() {
+               return os.ErrClosed
+       }
+
+       timeout := uint32(windows.INFINITE)
+       if !deadline.IsZero() {
+               timeout = uint32(internal.Between(time.Until(deadline).Milliseconds(), 0, windows.INFINITE-1))
+       }
+
+       // Wait for either the ring buffer handle or the flush handle to be signaled
+       result, err := windows.WaitForMultipleObjects(p.handles, false, timeout)
+       switch result {
+       case windows.WAIT_OBJECT_0:
+               // Ring buffer event
+               return nil
+       case windows.WAIT_OBJECT_0 + 1:
+               if p.closed.Load() {
+                       return os.ErrClosed
+               }
+               // Flush event
+               return ErrFlushed
+       case uint32(windows.WAIT_TIMEOUT):
+               return os.ErrDeadlineExceeded
+       case windows.WAIT_FAILED:
+               return err
+       default:
+               return fmt.Errorf("unexpected wait result 0x%x: %w", result, err)
+       }
+}
+
+// Flush interrupts [Wait] with [ErrFlushed].
+func (p *poller) Flush() error {
+       // Signal the handle to wake up any waiting threads
+       if err := windows.SetEvent(p.flushHandle); err != nil {
+               if errors.Is(err, windows.ERROR_INVALID_HANDLE) {
+                       return os.ErrClosed
+               }
+               return err
+       }
+
+       return nil
+}
+
+func (p *poller) Close() error {
+       p.closed.Store(true)
+
+       if err := p.Flush(); err != nil {
+               return err
+       }
+
+       return errors.Join(windows.CloseHandle(p.handle), windows.CloseHandle(p.flushHandle))
+}
index 40c5664885cdc241f84baacf18027e2934b46e7e..abb704cd5fb3206f4c08e0b5d5af009fb625af18 100644 (file)
@@ -1,61 +1,15 @@
-//go:build !windows
-
 package ringbuf
 
 import (
        "fmt"
        "io"
-       "os"
-       "runtime"
        "sync/atomic"
        "unsafe"
 
        "github.com/cilium/ebpf/internal"
        "github.com/cilium/ebpf/internal/sys"
-       "github.com/cilium/ebpf/internal/unix"
 )
 
-type ringbufEventRing struct {
-       prod []byte
-       cons []byte
-       *ringReader
-}
-
-func newRingBufEventRing(mapFD, size int) (*ringbufEventRing, error) {
-       cons, err := unix.Mmap(mapFD, 0, os.Getpagesize(), unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED)
-       if err != nil {
-               return nil, fmt.Errorf("can't mmap consumer page: %w", err)
-       }
-
-       prod, err := unix.Mmap(mapFD, (int64)(os.Getpagesize()), os.Getpagesize()+2*size, unix.PROT_READ, unix.MAP_SHARED)
-       if err != nil {
-               _ = unix.Munmap(cons)
-               return nil, fmt.Errorf("can't mmap data pages: %w", err)
-       }
-
-       cons_pos := (*uintptr)(unsafe.Pointer(&cons[0]))
-       prod_pos := (*uintptr)(unsafe.Pointer(&prod[0]))
-
-       ring := &ringbufEventRing{
-               prod:       prod,
-               cons:       cons,
-               ringReader: newRingReader(cons_pos, prod_pos, prod[os.Getpagesize():]),
-       }
-       runtime.SetFinalizer(ring, (*ringbufEventRing).Close)
-
-       return ring, nil
-}
-
-func (ring *ringbufEventRing) Close() {
-       runtime.SetFinalizer(ring, nil)
-
-       _ = unix.Munmap(ring.prod)
-       _ = unix.Munmap(ring.cons)
-
-       ring.prod = nil
-       ring.cons = nil
-}
-
 type ringReader struct {
        // These point into mmap'ed memory and must be accessed atomically.
        prod_pos, cons_pos *uintptr
diff --git a/ringbuf/ring_other.go b/ringbuf/ring_other.go
new file mode 100644 (file)
index 0000000..69f6b75
--- /dev/null
@@ -0,0 +1,56 @@
+//go:build !windows
+
+package ringbuf
+
+import (
+       "errors"
+       "fmt"
+       "os"
+       "runtime"
+       "unsafe"
+
+       "github.com/cilium/ebpf/internal/unix"
+)
+
+type ringbufEventRing struct {
+       prod []byte
+       cons []byte
+       *ringReader
+}
+
+func newRingBufEventRing(mapFD, size int) (*ringbufEventRing, error) {
+       cons, err := unix.Mmap(mapFD, 0, os.Getpagesize(), unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED)
+       if err != nil {
+               return nil, fmt.Errorf("mmap consumer page: %w", err)
+       }
+
+       prod, err := unix.Mmap(mapFD, (int64)(os.Getpagesize()), os.Getpagesize()+2*size, unix.PROT_READ, unix.MAP_SHARED)
+       if err != nil {
+               _ = unix.Munmap(cons)
+               return nil, fmt.Errorf("mmap data pages: %w", err)
+       }
+
+       cons_pos := (*uintptr)(unsafe.Pointer(&cons[0]))
+       prod_pos := (*uintptr)(unsafe.Pointer(&prod[0]))
+
+       ring := &ringbufEventRing{
+               prod:       prod,
+               cons:       cons,
+               ringReader: newRingReader(cons_pos, prod_pos, prod[os.Getpagesize():]),
+       }
+       runtime.SetFinalizer(ring, (*ringbufEventRing).Close)
+
+       return ring, nil
+}
+
+func (ring *ringbufEventRing) Close() error {
+       runtime.SetFinalizer(ring, nil)
+
+       prod, cons := ring.prod, ring.cons
+       ring.prod, ring.cons = nil, nil
+
+       return errors.Join(
+               unix.Munmap(prod),
+               unix.Munmap(cons),
+       )
+}
diff --git a/ringbuf/ring_windows.go b/ringbuf/ring_windows.go
new file mode 100644 (file)
index 0000000..6c695ae
--- /dev/null
@@ -0,0 +1,66 @@
+package ringbuf
+
+import (
+       "errors"
+       "fmt"
+       "runtime"
+       "unsafe"
+
+       "github.com/cilium/ebpf/internal/efw"
+       "github.com/cilium/ebpf/internal/sys"
+)
+
+type ringbufEventRing struct {
+       mapFd            *sys.FD
+       cons, prod, data *uint8
+       *ringReader
+}
+
+func newRingBufEventRing(mapFD, size int) (*ringbufEventRing, error) {
+       dupFd, err := efw.EbpfDuplicateFd(mapFD)
+       if err != nil {
+               return nil, fmt.Errorf("duplicate map fd: %w", err)
+       }
+
+       fd, err := sys.NewFD(dupFd)
+       if err != nil {
+               _ = efw.EbpfCloseFd(dupFd)
+               return nil, err
+       }
+
+       consPtr, prodPtr, dataPtr, dataLen, err := efw.EbpfRingBufferMapMapBuffer(dupFd)
+       if err != nil {
+               _ = fd.Close()
+               return nil, fmt.Errorf("map consumer page: %w", err)
+       }
+
+       if dataLen != efw.Size(size) {
+               _ = fd.Close()
+               return nil, fmt.Errorf("map data length mismatch: %d != %d", dataLen, size)
+       }
+
+       // consPtr and prodPtr are guaranteed to be page size aligned.
+       consPos := (*uintptr)(unsafe.Pointer(consPtr))
+       prodPos := (*uintptr)(unsafe.Pointer(prodPtr))
+       data := unsafe.Slice(dataPtr, dataLen*2)
+
+       ring := &ringbufEventRing{
+               mapFd:      fd,
+               cons:       consPtr,
+               prod:       prodPtr,
+               data:       dataPtr,
+               ringReader: newRingReader(consPos, prodPos, data),
+       }
+       runtime.SetFinalizer(ring, (*ringbufEventRing).Close)
+
+       return ring, nil
+}
+
+func (ring *ringbufEventRing) Close() error {
+       runtime.SetFinalizer(ring, nil)
+
+       return errors.Join(
+               efw.EbpfRingBufferMapUnmapBuffer(ring.mapFd.Int(), ring.cons, ring.prod, ring.data),
+               ring.mapFd.Close(),
+       )
+}