import (
"errors"
"fmt"
+ "io"
"os"
"path/filepath"
"strings"
// Watcher watches a set of files, delivering events to a channel.
type Watcher struct {
- Events chan Event
- Errors chan error
- mu sync.Mutex // Map access
- fd int // File descriptor (as returned by the inotify_init() syscall)
- watches map[string]*watch // Map of inotify watches (key: path)
- paths map[int]string // Map of watched paths (key: watch descriptor)
- done chan struct{} // Channel for sending a "quit message" to the reader goroutine
+ Events chan Event
+ Errors chan error
+ mu sync.Mutex // Map access
+ poller *fdPoller
+ watches map[string]*watch // Map of inotify watches (key: path)
+ paths map[int]string // Map of watched paths (key: watch descriptor)
+ done chan struct{} // Channel for sending a "quit message" to the reader goroutine
+ doneresp chan struct{} // Channel to respond to Close
}
// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events.
func NewWatcher() (*Watcher, error) {
- fd, errno := syscall.InotifyInit()
- if fd == -1 {
- return nil, os.NewSyscallError("inotify_init", errno)
+ poller, err := newFdPoller()
+ if err != nil {
+ return nil, err
}
w := &Watcher{
- fd: fd,
- watches: make(map[string]*watch),
- paths: make(map[int]string),
- Events: make(chan Event),
- Errors: make(chan error),
- done: make(chan struct{}),
+ poller: poller,
+ watches: make(map[string]*watch),
+ paths: make(map[int]string),
+ Events: make(chan Event),
+ Errors: make(chan error),
+ done: make(chan struct{}),
+ doneresp: make(chan struct{}),
}
go w.readEvents()
// Send 'close' signal to goroutine, and set the Watcher to closed.
close(w.done)
- // Remove all watches.
- // Everything after this may generate errors because the inotify channel
- // has been closed; we don't care.
- numWatches := w.removeAll()
-
- // If no watches were removed, it's possible syscall.Read is still blocking.
- // In this case, create a watch and remove it to wake it up.
- // If that fails, there's really nothing left to do. we've done our best,
- // but the goroutine may be alive forever.
- if numWatches == 0 {
- wd, _ := syscall.InotifyAddWatch(w.fd, ".", syscall.IN_DELETE_SELF)
- syscall.InotifyRmWatch(w.fd, uint32(wd))
- }
+ // Wake up goroutine
+ w.poller.wake()
+
+ // Wait for goroutine to close
+ <-w.doneresp
return nil
}
watchEntry.flags |= flags
flags |= syscall.IN_MASK_ADD
}
- wd, errno := syscall.InotifyAddWatch(w.fd, name, flags)
+ wd, errno := syscall.InotifyAddWatch(w.poller.fd, name, flags)
if wd == -1 {
return os.NewSyscallError("inotify_add_watch", errno)
}
if !ok {
return fmt.Errorf("can't remove non-existent inotify watch for: %s", name)
}
- success, errno := syscall.InotifyRmWatch(w.fd, watch.wd)
+ success, errno := syscall.InotifyRmWatch(w.poller.fd, watch.wd)
if success == -1 {
return os.NewSyscallError("inotify_rm_watch", errno)
}
return nil
}
-// removeAll watches
-func (w *Watcher) removeAll() int {
- removed := 0
- w.mu.Lock()
- defer w.mu.Unlock()
- for name, watch := range w.watches {
- success, _ := syscall.InotifyRmWatch(w.fd, watch.wd)
- if success != -1 {
- removed++
- }
- delete(w.watches, name)
- }
- return removed
-}
-
type watch struct {
wd uint32 // Watch descriptor (as returned by the inotify_add_watch() syscall)
flags uint32 // inotify flags of this watch (see inotify(7) for the list of valid flags)
buf [syscall.SizeofInotifyEvent * 4096]byte // Buffer for a maximum of 4096 raw events
n int // Number of bytes read with read()
errno error // Syscall errno
+ ok bool // For poller.wait
)
+ defer close(w.doneresp)
defer close(w.Errors)
defer close(w.Events)
- defer syscall.Close(w.fd)
+ defer w.poller.close()
for {
// See if we have been closed.
return
}
- n, errno = syscall.Read(w.fd, buf[:])
+ ok, errno = w.poller.wait()
+ if errno != nil {
+ select {
+ case w.Errors <- errno:
+ case <-w.done:
+ return
+ }
+ continue
+ }
+
+ if !ok {
+ continue
+ }
+
+ n, errno = syscall.Read(w.poller.fd, buf[:])
// If a signal interrupted execution, see if we've been asked to close, and try again.
// http://man7.org/linux/man-pages/man7/signal.7.html :
// "Before Linux 3.8, reads from an inotify(7) file descriptor were not restartable"
return
}
- // If EOF is received
+ // If EOF is received. This should really never happen.
if n == 0 {
- close(w.done)
- return
+ select {
+ case w.Errors <- io.EOF:
+ case <-w.done:
+ return
+ }
+ continue
}
if n < 0 {
--- /dev/null
+// Copyright 2015 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build linux
+
+package fsnotify
+
+import (
+ "errors"
+ "os"
+ "syscall"
+)
+
+type fdPoller struct {
+ fd int // File descriptor (as returned by the inotify_init() syscall)
+ epfd int // Epoll file descriptor
+ pipe [2]int // Pipe for waking up
+}
+
+// Create a new inotify poller.
+// This creates an inotify handler, and an epoll handler.
+func newFdPoller() (*fdPoller, error) {
+ var errno error
+ poller := new(fdPoller)
+
+ // Create inotify fd
+ poller.fd, errno = syscall.InotifyInit()
+ if poller.fd == -1 {
+ return nil, os.NewSyscallError("inotify_init", errno)
+ }
+ // Create epoll fd
+ poller.epfd, errno = syscall.EpollCreate(1)
+ if poller.epfd == -1 {
+ syscall.Close(poller.fd)
+ return nil, os.NewSyscallError("epoll_create", errno)
+ }
+ // Create pipe; pipe[0] is the read end, pipe[1] the write end.
+ errno = syscall.Pipe(poller.pipe[:])
+ if errno != nil {
+ syscall.Close(poller.fd)
+ syscall.Close(poller.epfd)
+ return nil, os.NewSyscallError("pipe", errno)
+ }
+
+ // Register inotify fd with epoll
+ event := syscall.EpollEvent{
+ Fd: int32(poller.fd),
+ Events: syscall.EPOLLIN,
+ }
+ errno = syscall.EpollCtl(poller.epfd, syscall.EPOLL_CTL_ADD, poller.fd, &event)
+ if errno != nil {
+ syscall.Close(poller.fd)
+ syscall.Close(poller.epfd)
+ syscall.Close(poller.pipe[0])
+ syscall.Close(poller.pipe[1])
+ return nil, os.NewSyscallError("epoll_ctl", errno)
+ }
+
+ // Register pipe fd with epoll
+ event = syscall.EpollEvent{
+ Fd: int32(poller.pipe[0]),
+ Events: syscall.EPOLLIN,
+ }
+ errno = syscall.EpollCtl(poller.epfd, syscall.EPOLL_CTL_ADD, poller.pipe[0], &event)
+ if errno != nil {
+ syscall.Close(poller.fd)
+ syscall.Close(poller.epfd)
+ syscall.Close(poller.pipe[0])
+ syscall.Close(poller.pipe[1])
+ return nil, os.NewSyscallError("epoll_ctl", errno)
+ }
+
+ return poller, nil
+}
+
+// Wait using epoll, then read from inotify.
+// Returns true if something is ready to be read,
+// false if there is not.
+func (poller *fdPoller) wait() (bool, error) {
+ events := make([]syscall.EpollEvent, 7)
+ for {
+ n, errno := syscall.EpollWait(poller.epfd, events, -1)
+ if n == -1 {
+ if errno == syscall.EINTR {
+ continue
+ }
+ return false, os.NewSyscallError("epoll_wait", errno)
+ }
+ if n == 0 {
+ // If there are no events, try again.
+ continue
+ }
+ if n > 6 {
+ // This should never happen.
+ return false, errors.New("epoll_wait returned more events than I know what to do with")
+ }
+ ready := events[:n]
+ epollhup := false
+ epollerr := false
+ epollin := false
+ epollpipehup := false
+ epollpipein := false
+ for _, event := range ready {
+ if event.Fd == int32(poller.fd) {
+ if event.Events&syscall.EPOLLHUP != 0 {
+ // This should not happen, but if it does, treat it as a wakeup.
+ epollhup = true
+ }
+ if event.Events&syscall.EPOLLERR != 0 {
+ // If an error is waiting on the file descriptor, we should pretend
+ // something is ready to read, and let syscall.Read pick up the error.
+ epollerr = true
+ }
+ if event.Events&syscall.EPOLLIN != 0 {
+ // There is data to read.
+ epollin = true
+ }
+ }
+ if event.Fd == int32(poller.pipe[0]) {
+ if event.Events&syscall.EPOLLHUP != 0 {
+ // Write pipe descriptor was closed, by us. This means we're closing down the
+ // watcher, and we should wake up.
+ epollpipehup = true
+ }
+ if event.Events&syscall.EPOLLERR != 0 {
+ // If an error is waiting on the pipe file descriptor.
+ // This is an absolute mystery, and should never ever happen.
+ return false, errors.New("Error on the pipe descriptor.")
+ }
+ if event.Events&syscall.EPOLLIN != 0 {
+ // This is a regular wakeup.
+ epollpipein = true
+ // Clear the buffer.
+ err := poller.clearWake()
+ if err != nil {
+ return false, err
+ }
+ }
+ }
+ }
+
+ if epollerr {
+ return true, nil
+ }
+ if epollhup || epollpipehup || epollpipein {
+ return false, nil
+ }
+ if epollin {
+ return true, nil
+ }
+ return false, errors.New("Epoll failed to generate any of the only six possibilities.")
+ }
+}
+
+// Close the write end of the poller.
+func (poller *fdPoller) wake() error {
+ buf := make([]byte, 1)
+ n, errno := syscall.Write(poller.pipe[1], buf)
+ if n == -1 {
+ return os.NewSyscallError("write", errno)
+ }
+ return nil
+}
+
+func (poller *fdPoller) clearWake() error {
+ buf := make([]byte, 100)
+ n, errno := syscall.Read(poller.pipe[0], buf)
+ if n == -1 {
+ return os.NewSyscallError("read", errno)
+ }
+ return nil
+}
+
+// Close all file descriptors.
+func (poller *fdPoller) close() {
+ syscall.Close(poller.pipe[1])
+ syscall.Close(poller.pipe[0])
+ syscall.Close(poller.fd)
+ syscall.Close(poller.epfd)
+}
t.Fatalf("w.Events would have blocked; readEvents is still alive!")
}
}
+
+func TestInotifyCloseCreate(t *testing.T) {
+ testDir := tempMkdir(t)
+ defer os.RemoveAll(testDir)
+
+ w, err := NewWatcher()
+ if err != nil {
+ t.Fatalf("Failed to create watcher: %v", err)
+ }
+ defer w.Close()
+
+ err = w.Add(testDir)
+ if err != nil {
+ t.Fatalf("Failed to add testDir: %v", err)
+ }
+ h, err := os.Create(filepath.Join(testDir, "testfile"))
+ if err != nil {
+ t.Fatalf("Failed to create file in testdir: %v", err)
+ }
+ h.Close()
+ select {
+ case _ = <-w.Events:
+ case err := <-w.Errors:
+ t.Fatalf("Error from watcher: %v", err)
+ case <-time.After(50 * time.Millisecond):
+ t.Fatalf("Took too long to wait for event")
+ }
+
+ // At this point, we've received one event, so the goroutine is ready.
+ // It's also blocking on syscall.Read.
+ // Now we try to swap the file descriptor under its nose.
+ w.Close()
+ w, err = NewWatcher()
+ if err != nil {
+ t.Fatalf("Failed to create second watcher: %v", err)
+ }
+
+ <-time.After(50 * time.Millisecond)
+ err = w.Add(testDir)
+ if err != nil {
+ t.Fatalf("Error adding testDir again: %v", err)
+ }
+}