From: Pieter Droogendijk Date: Sat, 7 Feb 2015 19:47:41 +0000 (+0100) Subject: Now use epoll so we can wake up readEvents X-Git-Tag: v1.7.2~236 X-Git-Url: https://go.fuhry.dev/?a=commitdiff_plain;h=e4fd8335543a7c8d3f108fb45f24c182b7058095;p=fsnotify.git Now use epoll so we can wake up readEvents --- diff --git a/inotify.go b/inotify.go index b61ae12..d2e671c 100644 --- a/inotify.go +++ b/inotify.go @@ -9,6 +9,7 @@ package fsnotify import ( "errors" "fmt" + "io" "os" "path/filepath" "strings" @@ -19,28 +20,30 @@ import ( // Watcher watches a set of files, delivering events to a channel. type Watcher struct { - Events chan Event - Errors chan error - mu sync.Mutex // Map access - fd int // File descriptor (as returned by the inotify_init() syscall) - watches map[string]*watch // Map of inotify watches (key: path) - paths map[int]string // Map of watched paths (key: watch descriptor) - done chan struct{} // Channel for sending a "quit message" to the reader goroutine + Events chan Event + Errors chan error + mu sync.Mutex // Map access + poller *fdPoller + watches map[string]*watch // Map of inotify watches (key: path) + paths map[int]string // Map of watched paths (key: watch descriptor) + done chan struct{} // Channel for sending a "quit message" to the reader goroutine + doneresp chan struct{} // Channel to respond to Close } // NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. func NewWatcher() (*Watcher, error) { - fd, errno := syscall.InotifyInit() - if fd == -1 { - return nil, os.NewSyscallError("inotify_init", errno) + poller, err := newFdPoller() + if err != nil { + return nil, err } w := &Watcher{ - fd: fd, - watches: make(map[string]*watch), - paths: make(map[int]string), - Events: make(chan Event), - Errors: make(chan error), - done: make(chan struct{}), + poller: poller, + watches: make(map[string]*watch), + paths: make(map[int]string), + Events: make(chan Event), + Errors: make(chan error), + done: make(chan struct{}), + doneresp: make(chan struct{}), } go w.readEvents() @@ -65,19 +68,11 @@ func (w *Watcher) Close() error { // Send 'close' signal to goroutine, and set the Watcher to closed. close(w.done) - // Remove all watches. - // Everything after this may generate errors because the inotify channel - // has been closed; we don't care. - numWatches := w.removeAll() - - // If no watches were removed, it's possible syscall.Read is still blocking. - // In this case, create a watch and remove it to wake it up. - // If that fails, there's really nothing left to do. we've done our best, - // but the goroutine may be alive forever. - if numWatches == 0 { - wd, _ := syscall.InotifyAddWatch(w.fd, ".", syscall.IN_DELETE_SELF) - syscall.InotifyRmWatch(w.fd, uint32(wd)) - } + // Wake up goroutine + w.poller.wake() + + // Wait for goroutine to close + <-w.doneresp return nil } @@ -102,7 +97,7 @@ func (w *Watcher) Add(name string) error { watchEntry.flags |= flags flags |= syscall.IN_MASK_ADD } - wd, errno := syscall.InotifyAddWatch(w.fd, name, flags) + wd, errno := syscall.InotifyAddWatch(w.poller.fd, name, flags) if wd == -1 { return os.NewSyscallError("inotify_add_watch", errno) } @@ -128,7 +123,7 @@ func (w *Watcher) Remove(name string) error { if !ok { return fmt.Errorf("can't remove non-existent inotify watch for: %s", name) } - success, errno := syscall.InotifyRmWatch(w.fd, watch.wd) + success, errno := syscall.InotifyRmWatch(w.poller.fd, watch.wd) if success == -1 { return os.NewSyscallError("inotify_rm_watch", errno) } @@ -136,21 +131,6 @@ func (w *Watcher) Remove(name string) error { return nil } -// removeAll watches -func (w *Watcher) removeAll() int { - removed := 0 - w.mu.Lock() - defer w.mu.Unlock() - for name, watch := range w.watches { - success, _ := syscall.InotifyRmWatch(w.fd, watch.wd) - if success != -1 { - removed++ - } - delete(w.watches, name) - } - return removed -} - type watch struct { wd uint32 // Watch descriptor (as returned by the inotify_add_watch() syscall) flags uint32 // inotify flags of this watch (see inotify(7) for the list of valid flags) @@ -163,11 +143,13 @@ func (w *Watcher) readEvents() { buf [syscall.SizeofInotifyEvent * 4096]byte // Buffer for a maximum of 4096 raw events n int // Number of bytes read with read() errno error // Syscall errno + ok bool // For poller.wait ) + defer close(w.doneresp) defer close(w.Errors) defer close(w.Events) - defer syscall.Close(w.fd) + defer w.poller.close() for { // See if we have been closed. @@ -175,7 +157,21 @@ func (w *Watcher) readEvents() { return } - n, errno = syscall.Read(w.fd, buf[:]) + ok, errno = w.poller.wait() + if errno != nil { + select { + case w.Errors <- errno: + case <-w.done: + return + } + continue + } + + if !ok { + continue + } + + n, errno = syscall.Read(w.poller.fd, buf[:]) // If a signal interrupted execution, see if we've been asked to close, and try again. // http://man7.org/linux/man-pages/man7/signal.7.html : // "Before Linux 3.8, reads from an inotify(7) file descriptor were not restartable" @@ -188,10 +184,14 @@ func (w *Watcher) readEvents() { return } - // If EOF is received + // If EOF is received. This should really never happen. if n == 0 { - close(w.done) - return + select { + case w.Errors <- io.EOF: + case <-w.done: + return + } + continue } if n < 0 { diff --git a/inotify_poller.go b/inotify_poller.go new file mode 100644 index 0000000..8fe3ed7 --- /dev/null +++ b/inotify_poller.go @@ -0,0 +1,181 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build linux + +package fsnotify + +import ( + "errors" + "os" + "syscall" +) + +type fdPoller struct { + fd int // File descriptor (as returned by the inotify_init() syscall) + epfd int // Epoll file descriptor + pipe [2]int // Pipe for waking up +} + +// Create a new inotify poller. +// This creates an inotify handler, and an epoll handler. +func newFdPoller() (*fdPoller, error) { + var errno error + poller := new(fdPoller) + + // Create inotify fd + poller.fd, errno = syscall.InotifyInit() + if poller.fd == -1 { + return nil, os.NewSyscallError("inotify_init", errno) + } + // Create epoll fd + poller.epfd, errno = syscall.EpollCreate(1) + if poller.epfd == -1 { + syscall.Close(poller.fd) + return nil, os.NewSyscallError("epoll_create", errno) + } + // Create pipe; pipe[0] is the read end, pipe[1] the write end. + errno = syscall.Pipe(poller.pipe[:]) + if errno != nil { + syscall.Close(poller.fd) + syscall.Close(poller.epfd) + return nil, os.NewSyscallError("pipe", errno) + } + + // Register inotify fd with epoll + event := syscall.EpollEvent{ + Fd: int32(poller.fd), + Events: syscall.EPOLLIN, + } + errno = syscall.EpollCtl(poller.epfd, syscall.EPOLL_CTL_ADD, poller.fd, &event) + if errno != nil { + syscall.Close(poller.fd) + syscall.Close(poller.epfd) + syscall.Close(poller.pipe[0]) + syscall.Close(poller.pipe[1]) + return nil, os.NewSyscallError("epoll_ctl", errno) + } + + // Register pipe fd with epoll + event = syscall.EpollEvent{ + Fd: int32(poller.pipe[0]), + Events: syscall.EPOLLIN, + } + errno = syscall.EpollCtl(poller.epfd, syscall.EPOLL_CTL_ADD, poller.pipe[0], &event) + if errno != nil { + syscall.Close(poller.fd) + syscall.Close(poller.epfd) + syscall.Close(poller.pipe[0]) + syscall.Close(poller.pipe[1]) + return nil, os.NewSyscallError("epoll_ctl", errno) + } + + return poller, nil +} + +// Wait using epoll, then read from inotify. +// Returns true if something is ready to be read, +// false if there is not. +func (poller *fdPoller) wait() (bool, error) { + events := make([]syscall.EpollEvent, 7) + for { + n, errno := syscall.EpollWait(poller.epfd, events, -1) + if n == -1 { + if errno == syscall.EINTR { + continue + } + return false, os.NewSyscallError("epoll_wait", errno) + } + if n == 0 { + // If there are no events, try again. + continue + } + if n > 6 { + // This should never happen. + return false, errors.New("epoll_wait returned more events than I know what to do with") + } + ready := events[:n] + epollhup := false + epollerr := false + epollin := false + epollpipehup := false + epollpipein := false + for _, event := range ready { + if event.Fd == int32(poller.fd) { + if event.Events&syscall.EPOLLHUP != 0 { + // This should not happen, but if it does, treat it as a wakeup. + epollhup = true + } + if event.Events&syscall.EPOLLERR != 0 { + // If an error is waiting on the file descriptor, we should pretend + // something is ready to read, and let syscall.Read pick up the error. + epollerr = true + } + if event.Events&syscall.EPOLLIN != 0 { + // There is data to read. + epollin = true + } + } + if event.Fd == int32(poller.pipe[0]) { + if event.Events&syscall.EPOLLHUP != 0 { + // Write pipe descriptor was closed, by us. This means we're closing down the + // watcher, and we should wake up. + epollpipehup = true + } + if event.Events&syscall.EPOLLERR != 0 { + // If an error is waiting on the pipe file descriptor. + // This is an absolute mystery, and should never ever happen. + return false, errors.New("Error on the pipe descriptor.") + } + if event.Events&syscall.EPOLLIN != 0 { + // This is a regular wakeup. + epollpipein = true + // Clear the buffer. + err := poller.clearWake() + if err != nil { + return false, err + } + } + } + } + + if epollerr { + return true, nil + } + if epollhup || epollpipehup || epollpipein { + return false, nil + } + if epollin { + return true, nil + } + return false, errors.New("Epoll failed to generate any of the only six possibilities.") + } +} + +// Close the write end of the poller. +func (poller *fdPoller) wake() error { + buf := make([]byte, 1) + n, errno := syscall.Write(poller.pipe[1], buf) + if n == -1 { + return os.NewSyscallError("write", errno) + } + return nil +} + +func (poller *fdPoller) clearWake() error { + buf := make([]byte, 100) + n, errno := syscall.Read(poller.pipe[0], buf) + if n == -1 { + return os.NewSyscallError("read", errno) + } + return nil +} + +// Close all file descriptors. +func (poller *fdPoller) close() { + syscall.Close(poller.pipe[1]) + syscall.Close(poller.pipe[0]) + syscall.Close(poller.fd) + syscall.Close(poller.epfd) +} diff --git a/inotify_test.go b/inotify_test.go index 21aa924..1c26a29 100644 --- a/inotify_test.go +++ b/inotify_test.go @@ -106,3 +106,46 @@ func isWatcherReallyClosed(t *testing.T, w *Watcher) { t.Fatalf("w.Events would have blocked; readEvents is still alive!") } } + +func TestInotifyCloseCreate(t *testing.T) { + testDir := tempMkdir(t) + defer os.RemoveAll(testDir) + + w, err := NewWatcher() + if err != nil { + t.Fatalf("Failed to create watcher: %v", err) + } + defer w.Close() + + err = w.Add(testDir) + if err != nil { + t.Fatalf("Failed to add testDir: %v", err) + } + h, err := os.Create(filepath.Join(testDir, "testfile")) + if err != nil { + t.Fatalf("Failed to create file in testdir: %v", err) + } + h.Close() + select { + case _ = <-w.Events: + case err := <-w.Errors: + t.Fatalf("Error from watcher: %v", err) + case <-time.After(50 * time.Millisecond): + t.Fatalf("Took too long to wait for event") + } + + // At this point, we've received one event, so the goroutine is ready. + // It's also blocking on syscall.Read. + // Now we try to swap the file descriptor under its nose. + w.Close() + w, err = NewWatcher() + if err != nil { + t.Fatalf("Failed to create second watcher: %v", err) + } + + <-time.After(50 * time.Millisecond) + err = w.Add(testDir) + if err != nil { + t.Fatalf("Error adding testDir again: %v", err) + } +}