]> go.fuhry.dev Git - fsnotify.git/commitdiff
Replace Use of Kthread-blocking Epoll with Poller Read, Remove Per-Event LStats on...
authorOtoMAN <65930041+horahoradev@users.noreply.github.com>
Sun, 24 Jul 2022 10:50:47 +0000 (03:50 -0700)
committerGitHub <noreply@github.com>
Sun, 24 Jul 2022 10:50:47 +0000 (12:50 +0200)
* Replaced use of raw epoll with netpoller read

* Remove Debian 6 Vagrant test; it's in #469 now

* Added ignoreLinux lstats back in

* Update test

Co-authored-by: Martin Tournoij <martin@arp242.net>
inotify.go
inotify_poller.go [deleted file]
inotify_poller_test.go [deleted file]
inotify_test.go

index b01124a63203371eb7ad5356b966bc8a880830a6..a42da583361a6c6a0269053dfb738e8032bc2442 100644 (file)
@@ -22,39 +22,36 @@ import (
 
 // Watcher watches a set of files, delivering events to a channel.
 type Watcher struct {
-       Events   chan Event
-       Errors   chan error
-       mu       sync.Mutex // Map access
-       fd       int
-       poller   *fdPoller
-       watches  map[string]*watch // Map of inotify watches (key: path)
-       paths    map[int]string    // Map of watched paths (key: watch descriptor)
-       done     chan struct{}     // Channel for sending a "quit message" to the reader goroutine
-       doneResp chan struct{}     // Channel to respond to Close
+       fd          int // https://github.com/golang/go/issues/26439 can't call .Fd() on os.FIle or Read will no longer return on Close()
+       Events      chan Event
+       Errors      chan error
+       mu          sync.Mutex // Map access
+       inotifyFile *os.File
+       watches     map[string]*watch // Map of inotify watches (key: path)
+       paths       map[int]string    // Map of watched paths (key: watch descriptor)
+       done        chan struct{}     // Channel for sending a "quit message" to the reader goroutine
+       doneResp    chan struct{}     // Channel to respond to Close
 }
 
 // NewWatcher establishes a new watcher with the underlying OS and begins waiting for events.
 func NewWatcher() (*Watcher, error) {
        // Create inotify fd
-       fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC)
+       // Need to set the FD to nonblocking mode in order for SetDeadline methods to work
+       // Otherwise, blocking i/o operations won't terminate on close
+       fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC | unix.IN_NONBLOCK)
        if fd == -1 {
                return nil, errno
        }
-       // Create epoll
-       poller, err := newFdPoller(fd)
-       if err != nil {
-               unix.Close(fd)
-               return nil, err
-       }
+
        w := &Watcher{
-               fd:       fd,
-               poller:   poller,
-               watches:  make(map[string]*watch),
-               paths:    make(map[int]string),
-               Events:   make(chan Event),
-               Errors:   make(chan error),
-               done:     make(chan struct{}),
-               doneResp: make(chan struct{}),
+               fd:          fd,
+               inotifyFile: os.NewFile(uintptr(fd), ""),
+               watches:     make(map[string]*watch),
+               paths:       make(map[int]string),
+               Events:      make(chan Event),
+               Errors:      make(chan error),
+               done:        make(chan struct{}),
+               doneResp:    make(chan struct{}),
        }
 
        go w.readEvents()
@@ -82,8 +79,11 @@ func (w *Watcher) Close() error {
        close(w.done)
        w.mu.Unlock()
 
-       // Wake up goroutine
-       w.poller.wake()
+       // Causes any blocking reads to return with an error, provided the file still supports deadline operations
+       err := w.inotifyFile.Close()
+       if err != nil {
+               return err
+       }
 
        // Wait for goroutine to close
        <-w.doneResp
@@ -189,16 +189,12 @@ type watch struct {
 func (w *Watcher) readEvents() {
        var (
                buf   [unix.SizeofInotifyEvent * 4096]byte // Buffer for a maximum of 4096 raw events
-               n     int                                  // Number of bytes read with read()
                errno error                                // Syscall errno
-               ok    bool                                 // For poller.wait
        )
 
        defer close(w.doneResp)
        defer close(w.Errors)
        defer close(w.Events)
-       defer unix.Close(w.fd)
-       defer w.poller.close()
 
        for {
                // See if we have been closed.
@@ -206,33 +202,19 @@ func (w *Watcher) readEvents() {
                        return
                }
 
-               ok, errno = w.poller.wait()
-               if errno != nil {
+               n, err := w.inotifyFile.Read(buf[:])
+               switch {
+               case errors.Unwrap(err) == os.ErrClosed:
+                       return
+               case err != nil:
                        select {
-                       case w.Errors <- errno:
+                       case w.Errors <- err:
                        case <-w.done:
                                return
                        }
                        continue
                }
 
-               if !ok {
-                       continue
-               }
-
-               n, errno = unix.Read(w.fd, buf[:])
-               // If a signal interrupted execution, see if we've been asked to close, and try again.
-               // http://man7.org/linux/man-pages/man7/signal.7.html :
-               // "Before Linux 3.8, reads from an inotify(7) file descriptor were not restartable"
-               if errno == unix.EINTR {
-                       continue
-               }
-
-               // unix.Read might have been woken up by Close. If so, we're done.
-               if w.isClosed() {
-                       return
-               }
-
                if n < unix.SizeofInotifyEvent {
                        var err error
                        if n == 0 {
diff --git a/inotify_poller.go b/inotify_poller.go
deleted file mode 100644 (file)
index b572a37..0000000
+++ /dev/null
@@ -1,187 +0,0 @@
-// Copyright 2015 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-//go:build linux
-// +build linux
-
-package fsnotify
-
-import (
-       "errors"
-
-       "golang.org/x/sys/unix"
-)
-
-type fdPoller struct {
-       fd   int    // File descriptor (as returned by the inotify_init() syscall)
-       epfd int    // Epoll file descriptor
-       pipe [2]int // Pipe for waking up
-}
-
-func emptyPoller(fd int) *fdPoller {
-       poller := new(fdPoller)
-       poller.fd = fd
-       poller.epfd = -1
-       poller.pipe[0] = -1
-       poller.pipe[1] = -1
-       return poller
-}
-
-// Create a new inotify poller.
-// This creates an inotify handler, and an epoll handler.
-func newFdPoller(fd int) (*fdPoller, error) {
-       var errno error
-       poller := emptyPoller(fd)
-       defer func() {
-               if errno != nil {
-                       poller.close()
-               }
-       }()
-
-       // Create epoll fd
-       poller.epfd, errno = unix.EpollCreate1(unix.EPOLL_CLOEXEC)
-       if poller.epfd == -1 {
-               return nil, errno
-       }
-       // Create pipe; pipe[0] is the read end, pipe[1] the write end.
-       errno = unix.Pipe2(poller.pipe[:], unix.O_NONBLOCK|unix.O_CLOEXEC)
-       if errno != nil {
-               return nil, errno
-       }
-
-       // Register inotify fd with epoll
-       event := unix.EpollEvent{
-               Fd:     int32(poller.fd),
-               Events: unix.EPOLLIN,
-       }
-       errno = unix.EpollCtl(poller.epfd, unix.EPOLL_CTL_ADD, poller.fd, &event)
-       if errno != nil {
-               return nil, errno
-       }
-
-       // Register pipe fd with epoll
-       event = unix.EpollEvent{
-               Fd:     int32(poller.pipe[0]),
-               Events: unix.EPOLLIN,
-       }
-       errno = unix.EpollCtl(poller.epfd, unix.EPOLL_CTL_ADD, poller.pipe[0], &event)
-       if errno != nil {
-               return nil, errno
-       }
-
-       return poller, nil
-}
-
-// Wait using epoll.
-// Returns true if something is ready to be read,
-// false if there is not.
-func (poller *fdPoller) wait() (bool, error) {
-       // 3 possible events per fd, and 2 fds, makes a maximum of 6 events.
-       // I don't know whether epoll_wait returns the number of events returned,
-       // or the total number of events ready.
-       // I decided to catch both by making the buffer one larger than the maximum.
-       events := make([]unix.EpollEvent, 7)
-       for {
-               n, errno := unix.EpollWait(poller.epfd, events, -1)
-               if n == -1 {
-                       if errno == unix.EINTR {
-                               continue
-                       }
-                       return false, errno
-               }
-               if n == 0 {
-                       // If there are no events, try again.
-                       continue
-               }
-               if n > 6 {
-                       // This should never happen. More events were returned than should be possible.
-                       return false, errors.New("epoll_wait returned more events than I know what to do with")
-               }
-               ready := events[:n]
-               epollhup := false
-               epollerr := false
-               epollin := false
-               for _, event := range ready {
-                       if event.Fd == int32(poller.fd) {
-                               if event.Events&unix.EPOLLHUP != 0 {
-                                       // This should not happen, but if it does, treat it as a wakeup.
-                                       epollhup = true
-                               }
-                               if event.Events&unix.EPOLLERR != 0 {
-                                       // If an error is waiting on the file descriptor, we should pretend
-                                       // something is ready to read, and let unix.Read pick up the error.
-                                       epollerr = true
-                               }
-                               if event.Events&unix.EPOLLIN != 0 {
-                                       // There is data to read.
-                                       epollin = true
-                               }
-                       }
-                       if event.Fd == int32(poller.pipe[0]) {
-                               if event.Events&unix.EPOLLHUP != 0 {
-                                       // Write pipe descriptor was closed, by us. This means we're closing down the
-                                       // watcher, and we should wake up.
-                               }
-                               if event.Events&unix.EPOLLERR != 0 {
-                                       // If an error is waiting on the pipe file descriptor.
-                                       // This is an absolute mystery, and should never ever happen.
-                                       return false, errors.New("Error on the pipe descriptor.")
-                               }
-                               if event.Events&unix.EPOLLIN != 0 {
-                                       // This is a regular wakeup, so we have to clear the buffer.
-                                       err := poller.clearWake()
-                                       if err != nil {
-                                               return false, err
-                                       }
-                               }
-                       }
-               }
-
-               if epollhup || epollerr || epollin {
-                       return true, nil
-               }
-               return false, nil
-       }
-}
-
-// Close the write end of the poller.
-func (poller *fdPoller) wake() error {
-       buf := make([]byte, 1)
-       n, errno := unix.Write(poller.pipe[1], buf)
-       if n == -1 {
-               if errno == unix.EAGAIN {
-                       // Buffer is full, poller will wake.
-                       return nil
-               }
-               return errno
-       }
-       return nil
-}
-
-func (poller *fdPoller) clearWake() error {
-       // You have to be woken up a LOT in order to get to 100!
-       buf := make([]byte, 100)
-       n, errno := unix.Read(poller.pipe[0], buf)
-       if n == -1 {
-               if errno == unix.EAGAIN {
-                       // Buffer is empty, someone else cleared our wake.
-                       return nil
-               }
-               return errno
-       }
-       return nil
-}
-
-// Close all poller file descriptors, but not the one passed to it.
-func (poller *fdPoller) close() {
-       if poller.pipe[1] != -1 {
-               unix.Close(poller.pipe[1])
-       }
-       if poller.pipe[0] != -1 {
-               unix.Close(poller.pipe[0])
-       }
-       if poller.epfd != -1 {
-               unix.Close(poller.epfd)
-       }
-}
diff --git a/inotify_poller_test.go b/inotify_poller_test.go
deleted file mode 100644 (file)
index 110e00d..0000000
+++ /dev/null
@@ -1,234 +0,0 @@
-// Copyright 2015 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-//go:build linux
-// +build linux
-
-package fsnotify
-
-import (
-       "testing"
-       "time"
-
-       "golang.org/x/sys/unix"
-)
-
-type testFd [2]int
-
-func makeTestFd(t *testing.T) testFd {
-       var tfd testFd
-       errno := unix.Pipe(tfd[:])
-       if errno != nil {
-               t.Fatalf("Failed to create pipe: %v", errno)
-       }
-       return tfd
-}
-
-func (tfd testFd) fd() int {
-       return tfd[0]
-}
-
-func (tfd testFd) closeWrite(t *testing.T) {
-       errno := unix.Close(tfd[1])
-       if errno != nil {
-               t.Fatalf("Failed to close write end of pipe: %v", errno)
-       }
-}
-
-func (tfd testFd) put(t *testing.T) {
-       buf := make([]byte, 10)
-       _, errno := unix.Write(tfd[1], buf)
-       if errno != nil {
-               t.Fatalf("Failed to write to pipe: %v", errno)
-       }
-}
-
-func (tfd testFd) get(t *testing.T) {
-       buf := make([]byte, 10)
-       _, errno := unix.Read(tfd[0], buf)
-       if errno != nil {
-               t.Fatalf("Failed to read from pipe: %v", errno)
-       }
-}
-
-func (tfd testFd) close() {
-       unix.Close(tfd[1])
-       unix.Close(tfd[0])
-}
-
-func makePoller(t *testing.T) (testFd, *fdPoller) {
-       tfd := makeTestFd(t)
-       poller, err := newFdPoller(tfd.fd())
-       if err != nil {
-               t.Fatalf("Failed to create poller: %v", err)
-       }
-       return tfd, poller
-}
-
-func TestPollerWithBadFd(t *testing.T) {
-       _, err := newFdPoller(-1)
-       if err != unix.EBADF {
-               t.Fatalf("Expected EBADF, got: %v", err)
-       }
-}
-
-func TestPollerWithData(t *testing.T) {
-       tfd, poller := makePoller(t)
-       defer tfd.close()
-       defer poller.close()
-
-       tfd.put(t)
-       ok, err := poller.wait()
-       if err != nil {
-               t.Fatalf("poller failed: %v", err)
-       }
-       if !ok {
-               t.Fatalf("expected poller to return true")
-       }
-       tfd.get(t)
-}
-
-func TestPollerWithWakeup(t *testing.T) {
-       tfd, poller := makePoller(t)
-       defer tfd.close()
-       defer poller.close()
-
-       err := poller.wake()
-       if err != nil {
-               t.Fatalf("wake failed: %v", err)
-       }
-       ok, err := poller.wait()
-       if err != nil {
-               t.Fatalf("poller failed: %v", err)
-       }
-       if ok {
-               t.Fatalf("expected poller to return false")
-       }
-}
-
-func TestPollerWithClose(t *testing.T) {
-       tfd, poller := makePoller(t)
-       defer tfd.close()
-       defer poller.close()
-
-       tfd.closeWrite(t)
-       ok, err := poller.wait()
-       if err != nil {
-               t.Fatalf("poller failed: %v", err)
-       }
-       if !ok {
-               t.Fatalf("expected poller to return true")
-       }
-}
-
-func TestPollerWithWakeupAndData(t *testing.T) {
-       tfd, poller := makePoller(t)
-       defer tfd.close()
-       defer poller.close()
-
-       tfd.put(t)
-       err := poller.wake()
-       if err != nil {
-               t.Fatalf("wake failed: %v", err)
-       }
-
-       // both data and wakeup
-       ok, err := poller.wait()
-       if err != nil {
-               t.Fatalf("poller failed: %v", err)
-       }
-       if !ok {
-               t.Fatalf("expected poller to return true")
-       }
-
-       // data is still in the buffer, wakeup is cleared
-       ok, err = poller.wait()
-       if err != nil {
-               t.Fatalf("poller failed: %v", err)
-       }
-       if !ok {
-               t.Fatalf("expected poller to return true")
-       }
-
-       tfd.get(t)
-       // data is gone, only wakeup now
-       err = poller.wake()
-       if err != nil {
-               t.Fatalf("wake failed: %v", err)
-       }
-       ok, err = poller.wait()
-       if err != nil {
-               t.Fatalf("poller failed: %v", err)
-       }
-       if ok {
-               t.Fatalf("expected poller to return false")
-       }
-}
-
-func TestPollerConcurrent(t *testing.T) {
-       tfd, poller := makePoller(t)
-       defer tfd.close()
-       defer poller.close()
-
-       oks := make(chan bool)
-       live := make(chan bool)
-       defer close(live)
-       go func() {
-               defer close(oks)
-               for {
-                       ok, err := poller.wait()
-                       if err != nil {
-                               t.Errorf("poller failed: %v", err)
-                       }
-                       oks <- ok
-                       if !<-live {
-                               return
-                       }
-               }
-       }()
-
-       // Try a write
-       select {
-       case <-time.After(50 * time.Millisecond):
-       case <-oks:
-               t.Fatalf("poller did not wait")
-       }
-       tfd.put(t)
-       if !<-oks {
-               t.Fatalf("expected true")
-       }
-       tfd.get(t)
-       live <- true
-
-       // Try a wakeup
-       select {
-       case <-time.After(50 * time.Millisecond):
-       case <-oks:
-               t.Fatalf("poller did not wait")
-       }
-       err := poller.wake()
-       if err != nil {
-               t.Fatalf("wake failed: %v", err)
-       }
-       if <-oks {
-               t.Fatalf("expected false")
-       }
-       live <- true
-
-       // Try a close
-       select {
-       case <-time.After(50 * time.Millisecond):
-       case <-oks:
-               t.Fatalf("poller did not wait")
-       }
-       tfd.closeWrite(t)
-       if !<-oks {
-               t.Fatalf("expected true")
-       }
-       tfd.get(t)
-
-       // wait for all goroutines for finish.
-       live <- false
-       <-oks
-}
index 269c3ff80a24ea17aad006c24bca1dd0038402a8..d9ee53ff0ce34f2031436e5129768fb4dc09aa6f 100644 (file)
@@ -550,3 +550,35 @@ func TestInotifyDeleteOpenedFile(t *testing.T) {
        fd.Close()
        checkEvent(Remove)
 }
+
+func TestINotifyNoBlockingSyscalls(t *testing.T) {
+       getThreads := func() int {
+               d := fmt.Sprintf("/proc/%d/task", os.Getpid())
+               ls, err := os.ReadDir(d)
+               if err != nil {
+                       t.Fatalf("reading %q: %s", d, err)
+               }
+               return len(ls)
+       }
+
+       w, err := NewWatcher()
+       if err != nil {
+               t.Fatalf("Failed to create watcher: %v", err)
+       }
+
+       startingThreads := getThreads()
+       // Call readEvents a bunch of times; if this function has a blocking raw syscall, it'll create many new kthreads
+       for i := 0; i <= 60; i++ {
+               go w.readEvents()
+       }
+
+       // Bad synchronization mechanism
+       time.Sleep(time.Second * 2)
+
+       endingThreads := getThreads()
+
+       // Did we spawn any new threads?
+       if diff := endingThreads - startingThreads; diff > 0 {
+               t.Fatalf("Got a nonzero diff %v. starting: %v. ending: %v", diff, startingThreads, endingThreads)
+       }
+}