]> go.fuhry.dev Git - fsnotify.git/commitdiff
Now use epoll so we can wake up readEvents
authorPieter Droogendijk <pieter@binky.org.uk>
Sat, 7 Feb 2015 19:47:41 +0000 (20:47 +0100)
committerNathan Youngman <git@nathany.com>
Sun, 8 Feb 2015 20:22:30 +0000 (13:22 -0700)
inotify.go
inotify_poller.go [new file with mode: 0644]
inotify_test.go

index b61ae12423b08b9db8d9484f3f3ecff214098545..d2e671cb3118b18a169b31b222e300e79e9de4a5 100644 (file)
@@ -9,6 +9,7 @@ package fsnotify
 import (
        "errors"
        "fmt"
+       "io"
        "os"
        "path/filepath"
        "strings"
@@ -19,28 +20,30 @@ import (
 
 // Watcher watches a set of files, delivering events to a channel.
 type Watcher struct {
-       Events  chan Event
-       Errors  chan error
-       mu      sync.Mutex        // Map access
-       fd      int               // File descriptor (as returned by the inotify_init() syscall)
-       watches map[string]*watch // Map of inotify watches (key: path)
-       paths   map[int]string    // Map of watched paths (key: watch descriptor)
-       done    chan struct{}     // Channel for sending a "quit message" to the reader goroutine
+       Events   chan Event
+       Errors   chan error
+       mu       sync.Mutex // Map access
+       poller   *fdPoller
+       watches  map[string]*watch // Map of inotify watches (key: path)
+       paths    map[int]string    // Map of watched paths (key: watch descriptor)
+       done     chan struct{}     // Channel for sending a "quit message" to the reader goroutine
+       doneresp chan struct{}     // Channel to respond to Close
 }
 
 // NewWatcher establishes a new watcher with the underlying OS and begins waiting for events.
 func NewWatcher() (*Watcher, error) {
-       fd, errno := syscall.InotifyInit()
-       if fd == -1 {
-               return nil, os.NewSyscallError("inotify_init", errno)
+       poller, err := newFdPoller()
+       if err != nil {
+               return nil, err
        }
        w := &Watcher{
-               fd:      fd,
-               watches: make(map[string]*watch),
-               paths:   make(map[int]string),
-               Events:  make(chan Event),
-               Errors:  make(chan error),
-               done:    make(chan struct{}),
+               poller:   poller,
+               watches:  make(map[string]*watch),
+               paths:    make(map[int]string),
+               Events:   make(chan Event),
+               Errors:   make(chan error),
+               done:     make(chan struct{}),
+               doneresp: make(chan struct{}),
        }
 
        go w.readEvents()
@@ -65,19 +68,11 @@ func (w *Watcher) Close() error {
        // Send 'close' signal to goroutine, and set the Watcher to closed.
        close(w.done)
 
-       // Remove all watches.
-       // Everything after this may generate errors because the inotify channel
-       // has been closed; we don't care.
-       numWatches := w.removeAll()
-
-       // If no watches were removed, it's possible syscall.Read is still blocking.
-       // In this case, create a watch and remove it to wake it up.
-       // If that fails, there's really nothing left to do. we've done our best,
-       // but the goroutine may be alive forever.
-       if numWatches == 0 {
-               wd, _ := syscall.InotifyAddWatch(w.fd, ".", syscall.IN_DELETE_SELF)
-               syscall.InotifyRmWatch(w.fd, uint32(wd))
-       }
+       // Wake up goroutine
+       w.poller.wake()
+
+       // Wait for goroutine to close
+       <-w.doneresp
 
        return nil
 }
@@ -102,7 +97,7 @@ func (w *Watcher) Add(name string) error {
                watchEntry.flags |= flags
                flags |= syscall.IN_MASK_ADD
        }
-       wd, errno := syscall.InotifyAddWatch(w.fd, name, flags)
+       wd, errno := syscall.InotifyAddWatch(w.poller.fd, name, flags)
        if wd == -1 {
                return os.NewSyscallError("inotify_add_watch", errno)
        }
@@ -128,7 +123,7 @@ func (w *Watcher) Remove(name string) error {
        if !ok {
                return fmt.Errorf("can't remove non-existent inotify watch for: %s", name)
        }
-       success, errno := syscall.InotifyRmWatch(w.fd, watch.wd)
+       success, errno := syscall.InotifyRmWatch(w.poller.fd, watch.wd)
        if success == -1 {
                return os.NewSyscallError("inotify_rm_watch", errno)
        }
@@ -136,21 +131,6 @@ func (w *Watcher) Remove(name string) error {
        return nil
 }
 
-// removeAll watches
-func (w *Watcher) removeAll() int {
-       removed := 0
-       w.mu.Lock()
-       defer w.mu.Unlock()
-       for name, watch := range w.watches {
-               success, _ := syscall.InotifyRmWatch(w.fd, watch.wd)
-               if success != -1 {
-                       removed++
-               }
-               delete(w.watches, name)
-       }
-       return removed
-}
-
 type watch struct {
        wd    uint32 // Watch descriptor (as returned by the inotify_add_watch() syscall)
        flags uint32 // inotify flags of this watch (see inotify(7) for the list of valid flags)
@@ -163,11 +143,13 @@ func (w *Watcher) readEvents() {
                buf   [syscall.SizeofInotifyEvent * 4096]byte // Buffer for a maximum of 4096 raw events
                n     int                                     // Number of bytes read with read()
                errno error                                   // Syscall errno
+               ok    bool                                    // For poller.wait
        )
 
+       defer close(w.doneresp)
        defer close(w.Errors)
        defer close(w.Events)
-       defer syscall.Close(w.fd)
+       defer w.poller.close()
 
        for {
                // See if we have been closed.
@@ -175,7 +157,21 @@ func (w *Watcher) readEvents() {
                        return
                }
 
-               n, errno = syscall.Read(w.fd, buf[:])
+               ok, errno = w.poller.wait()
+               if errno != nil {
+                       select {
+                       case w.Errors <- errno:
+                       case <-w.done:
+                               return
+                       }
+                       continue
+               }
+
+               if !ok {
+                       continue
+               }
+
+               n, errno = syscall.Read(w.poller.fd, buf[:])
                // If a signal interrupted execution, see if we've been asked to close, and try again.
                // http://man7.org/linux/man-pages/man7/signal.7.html :
                // "Before Linux 3.8, reads from an inotify(7) file descriptor were not restartable"
@@ -188,10 +184,14 @@ func (w *Watcher) readEvents() {
                        return
                }
 
-               // If EOF is received
+               // If EOF is received. This should really never happen.
                if n == 0 {
-                       close(w.done)
-                       return
+                       select {
+                       case w.Errors <- io.EOF:
+                       case <-w.done:
+                               return
+                       }
+                       continue
                }
 
                if n < 0 {
diff --git a/inotify_poller.go b/inotify_poller.go
new file mode 100644 (file)
index 0000000..8fe3ed7
--- /dev/null
@@ -0,0 +1,181 @@
+// Copyright 2015 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build linux
+
+package fsnotify
+
+import (
+       "errors"
+       "os"
+       "syscall"
+)
+
+type fdPoller struct {
+       fd   int    // File descriptor (as returned by the inotify_init() syscall)
+       epfd int    // Epoll file descriptor
+       pipe [2]int // Pipe for waking up
+}
+
+// Create a new inotify poller.
+// This creates an inotify handler, and an epoll handler.
+func newFdPoller() (*fdPoller, error) {
+       var errno error
+       poller := new(fdPoller)
+
+       // Create inotify fd
+       poller.fd, errno = syscall.InotifyInit()
+       if poller.fd == -1 {
+               return nil, os.NewSyscallError("inotify_init", errno)
+       }
+       // Create epoll fd
+       poller.epfd, errno = syscall.EpollCreate(1)
+       if poller.epfd == -1 {
+               syscall.Close(poller.fd)
+               return nil, os.NewSyscallError("epoll_create", errno)
+       }
+       // Create pipe; pipe[0] is the read end, pipe[1] the write end.
+       errno = syscall.Pipe(poller.pipe[:])
+       if errno != nil {
+               syscall.Close(poller.fd)
+               syscall.Close(poller.epfd)
+               return nil, os.NewSyscallError("pipe", errno)
+       }
+
+       // Register inotify fd with epoll
+       event := syscall.EpollEvent{
+               Fd:     int32(poller.fd),
+               Events: syscall.EPOLLIN,
+       }
+       errno = syscall.EpollCtl(poller.epfd, syscall.EPOLL_CTL_ADD, poller.fd, &event)
+       if errno != nil {
+               syscall.Close(poller.fd)
+               syscall.Close(poller.epfd)
+               syscall.Close(poller.pipe[0])
+               syscall.Close(poller.pipe[1])
+               return nil, os.NewSyscallError("epoll_ctl", errno)
+       }
+
+       // Register pipe fd with epoll
+       event = syscall.EpollEvent{
+               Fd:     int32(poller.pipe[0]),
+               Events: syscall.EPOLLIN,
+       }
+       errno = syscall.EpollCtl(poller.epfd, syscall.EPOLL_CTL_ADD, poller.pipe[0], &event)
+       if errno != nil {
+               syscall.Close(poller.fd)
+               syscall.Close(poller.epfd)
+               syscall.Close(poller.pipe[0])
+               syscall.Close(poller.pipe[1])
+               return nil, os.NewSyscallError("epoll_ctl", errno)
+       }
+
+       return poller, nil
+}
+
+// Wait using epoll, then read from inotify.
+// Returns true if something is ready to be read,
+// false if there is not.
+func (poller *fdPoller) wait() (bool, error) {
+       events := make([]syscall.EpollEvent, 7)
+       for {
+               n, errno := syscall.EpollWait(poller.epfd, events, -1)
+               if n == -1 {
+                       if errno == syscall.EINTR {
+                               continue
+                       }
+                       return false, os.NewSyscallError("epoll_wait", errno)
+               }
+               if n == 0 {
+                       // If there are no events, try again.
+                       continue
+               }
+               if n > 6 {
+                       // This should never happen.
+                       return false, errors.New("epoll_wait returned more events than I know what to do with")
+               }
+               ready := events[:n]
+               epollhup := false
+               epollerr := false
+               epollin := false
+               epollpipehup := false
+               epollpipein := false
+               for _, event := range ready {
+                       if event.Fd == int32(poller.fd) {
+                               if event.Events&syscall.EPOLLHUP != 0 {
+                                       // This should not happen, but if it does, treat it as a wakeup.
+                                       epollhup = true
+                               }
+                               if event.Events&syscall.EPOLLERR != 0 {
+                                       // If an error is waiting on the file descriptor, we should pretend
+                                       // something is ready to read, and let syscall.Read pick up the error.
+                                       epollerr = true
+                               }
+                               if event.Events&syscall.EPOLLIN != 0 {
+                                       // There is data to read.
+                                       epollin = true
+                               }
+                       }
+                       if event.Fd == int32(poller.pipe[0]) {
+                               if event.Events&syscall.EPOLLHUP != 0 {
+                                       // Write pipe descriptor was closed, by us. This means we're closing down the
+                                       // watcher, and we should wake up.
+                                       epollpipehup = true
+                               }
+                               if event.Events&syscall.EPOLLERR != 0 {
+                                       // If an error is waiting on the pipe file descriptor.
+                                       // This is an absolute mystery, and should never ever happen.
+                                       return false, errors.New("Error on the pipe descriptor.")
+                               }
+                               if event.Events&syscall.EPOLLIN != 0 {
+                                       // This is a regular wakeup.
+                                       epollpipein = true
+                                       // Clear the buffer.
+                                       err := poller.clearWake()
+                                       if err != nil {
+                                               return false, err
+                                       }
+                               }
+                       }
+               }
+
+               if epollerr {
+                       return true, nil
+               }
+               if epollhup || epollpipehup || epollpipein {
+                       return false, nil
+               }
+               if epollin {
+                       return true, nil
+               }
+               return false, errors.New("Epoll failed to generate any of the only six possibilities.")
+       }
+}
+
+// Close the write end of the poller.
+func (poller *fdPoller) wake() error {
+       buf := make([]byte, 1)
+       n, errno := syscall.Write(poller.pipe[1], buf)
+       if n == -1 {
+               return os.NewSyscallError("write", errno)
+       }
+       return nil
+}
+
+func (poller *fdPoller) clearWake() error {
+       buf := make([]byte, 100)
+       n, errno := syscall.Read(poller.pipe[0], buf)
+       if n == -1 {
+               return os.NewSyscallError("read", errno)
+       }
+       return nil
+}
+
+// Close all file descriptors.
+func (poller *fdPoller) close() {
+       syscall.Close(poller.pipe[1])
+       syscall.Close(poller.pipe[0])
+       syscall.Close(poller.fd)
+       syscall.Close(poller.epfd)
+}
index 21aa924f1e0b5f204217c336cb70b33e1d943393..1c26a294740756b3f8011d3122b6c90c4dbb1b19 100644 (file)
@@ -106,3 +106,46 @@ func isWatcherReallyClosed(t *testing.T, w *Watcher) {
                t.Fatalf("w.Events would have blocked; readEvents is still alive!")
        }
 }
+
+func TestInotifyCloseCreate(t *testing.T) {
+       testDir := tempMkdir(t)
+       defer os.RemoveAll(testDir)
+
+       w, err := NewWatcher()
+       if err != nil {
+               t.Fatalf("Failed to create watcher: %v", err)
+       }
+       defer w.Close()
+
+       err = w.Add(testDir)
+       if err != nil {
+               t.Fatalf("Failed to add testDir: %v", err)
+       }
+       h, err := os.Create(filepath.Join(testDir, "testfile"))
+       if err != nil {
+               t.Fatalf("Failed to create file in testdir: %v", err)
+       }
+       h.Close()
+       select {
+       case _ = <-w.Events:
+       case err := <-w.Errors:
+               t.Fatalf("Error from watcher: %v", err)
+       case <-time.After(50 * time.Millisecond):
+               t.Fatalf("Took too long to wait for event")
+       }
+
+       // At this point, we've received one event, so the goroutine is ready.
+       // It's also blocking on syscall.Read.
+       // Now we try to swap the file descriptor under its nose.
+       w.Close()
+       w, err = NewWatcher()
+       if err != nil {
+               t.Fatalf("Failed to create second watcher: %v", err)
+       }
+
+       <-time.After(50 * time.Millisecond)
+       err = w.Add(testDir)
+       if err != nil {
+               t.Fatalf("Error adding testDir again: %v", err)
+       }
+}