]> go.fuhry.dev Git - fsnotify.git/commitdiff
kqueue: less mutexes
authorNathan Youngman <git@nathany.com>
Sun, 14 Sep 2014 23:58:38 +0000 (17:58 -0600)
committerNathan Youngman <git@nathany.com>
Wed, 24 Sep 2014 21:34:04 +0000 (15:34 -0600)
closes #13

kqueue.go

index 9af190ca1b9925eaa8c235d660014f15e74cd676..4f37b49a74095bb02cea907335fd88a4c7a9768b 100644 (file)
--- a/kqueue.go
+++ b/kqueue.go
@@ -21,24 +21,17 @@ import (
 type Watcher struct {
        Events chan Event
        Errors chan error
+       done   chan bool // Channel for sending a "quit message" to the reader goroutine
 
-       kq              int               // File descriptor (as returned by the kqueue() syscall).
+       kq int // File descriptor (as returned by the kqueue() syscall).
+
+       mu              sync.Mutex        // Protects access to watcher data
        watches         map[string]int    // Map of watched file descriptors (key: path).
        externalWatches map[string]bool   // Map of watches added by user of the library.
        dirFlags        map[string]uint32 // Map of watched directories to fflags used in kqueue.
        paths           map[int]pathInfo  // Map file descriptors to path names for processing kqueue events.
        fileExists      map[string]bool   // Keep track of if we know this file exists (to stop duplicate create events).
-       done            chan bool         // Channel for sending a "quit message" to the reader goroutine
        isClosed        bool              // Set to true when Close() is first called
-
-       mu sync.Mutex // Mutex for the Watcher itself (isClosed).
-
-       wmut  sync.Mutex // Protects access to watches.
-       pmut  sync.Mutex // Protects access to paths.
-       ewmut sync.Mutex // Protects access to externalWatches.
-
-       dirmut sync.Mutex // Protects access to dirFlags.
-       femut  sync.Mutex // Protects access to fileExists.
 }
 
 type pathInfo struct {
@@ -81,9 +74,10 @@ func (w *Watcher) Close() error {
 
        // Send "quit" message to the reader goroutine:
        w.done <- true
-       w.wmut.Lock()
+
+       w.mu.Lock()
        ws := w.watches
-       w.wmut.Unlock()
+       w.mu.Unlock()
        for name := range ws {
                w.Remove(name)
        }
@@ -93,18 +87,18 @@ func (w *Watcher) Close() error {
 
 // Add starts watching the named file or directory (non-recursively).
 func (w *Watcher) Add(name string) error {
-       w.ewmut.Lock()
+       w.mu.Lock()
        w.externalWatches[name] = true
-       w.ewmut.Unlock()
+       w.mu.Unlock()
        return w.addWatch(name, noteAllEvents)
 }
 
 // Remove stops watching the the named file or directory (non-recursively).
 func (w *Watcher) Remove(name string) error {
        name = filepath.Clean(name)
-       w.wmut.Lock()
+       w.mu.Lock()
        watchfd, ok := w.watches[name]
-       w.wmut.Unlock()
+       w.mu.Unlock()
        if !ok {
                return fmt.Errorf("can't remove non-existent kevent watch for: %s", name)
        }
@@ -116,32 +110,26 @@ func (w *Watcher) Remove(name string) error {
 
        syscall.Close(watchfd)
 
-       w.wmut.Lock()
-       delete(w.watches, name)
-       w.wmut.Unlock()
-       w.dirmut.Lock()
-       delete(w.dirFlags, name)
-       w.dirmut.Unlock()
-       w.pmut.Lock()
+       w.mu.Lock()
        isDir := w.paths[watchfd].isDir
+       delete(w.watches, name)
        delete(w.paths, watchfd)
-       w.pmut.Unlock()
+       delete(w.dirFlags, name)
+       w.mu.Unlock()
 
        // Find all watched paths that are in this directory that are not external.
        if isDir {
                var pathsToRemove []string
-               w.pmut.Lock()
+               w.mu.Lock()
                for _, path := range w.paths {
                        wdir, _ := filepath.Split(path.name)
-                       if filepath.Clean(wdir) == filepath.Clean(name) {
-                               w.ewmut.Lock()
+                       if filepath.Clean(wdir) == name {
                                if !w.externalWatches[path.name] {
                                        pathsToRemove = append(pathsToRemove, path.name)
                                }
-                               w.ewmut.Unlock()
                        }
                }
-               w.pmut.Unlock()
+               w.mu.Unlock()
                for _, name := range pathsToRemove {
                        // Since these are internal, not much sense in propagating error
                        // to the user, as that will just confuse them with an error about
@@ -162,34 +150,29 @@ var keventWaitTime = durationToTimespec(100 * time.Millisecond)
 // addWatch adds name to the watched file set.
 // The flags are interpreted as described in kevent(2).
 func (w *Watcher) addWatch(name string, flags uint32) error {
+       var isDir bool
+       // Make ./name and name equivalent
+       name = filepath.Clean(name)
+
        w.mu.Lock()
        if w.isClosed {
                w.mu.Unlock()
                return errors.New("kevent instance already closed")
        }
-       w.mu.Unlock()
-
-       // Make ./name and name equivalent
-       name = filepath.Clean(name)
-
-       w.wmut.Lock()
        watchfd, alreadyWatching := w.watches[name]
-       w.wmut.Unlock()
-
-       var isDir bool
-
+       // We already have a watch, but we can still override flags.
        if alreadyWatching {
-               // We already have a watch, but we can still override flags
-               w.pmut.Lock()
                isDir = w.paths[watchfd].isDir
-               w.pmut.Unlock()
-       } else {
+       }
+       w.mu.Unlock()
+
+       if !alreadyWatching {
                fi, err := os.Lstat(name)
                if err != nil {
                        return err
                }
 
-               // don't watch socket
+               // Don't watch sockets.
                if fi.Mode()&os.ModeSocket == os.ModeSocket {
                        return nil
                }
@@ -227,24 +210,21 @@ func (w *Watcher) addWatch(name string, flags uint32) error {
        }
 
        if !alreadyWatching {
-               w.wmut.Lock()
+               w.mu.Lock()
                w.watches[name] = watchfd
-               w.wmut.Unlock()
-
-               w.pmut.Lock()
                w.paths[watchfd] = pathInfo{name: name, isDir: isDir}
-               w.pmut.Unlock()
+               w.mu.Unlock()
        }
 
        if isDir {
                // Watch the directory if it has not been watched before,
                // or if it was watched before, but perhaps only a NOTE_DELETE (watchDirectoryFiles)
-               w.dirmut.Lock()
+               w.mu.Lock()
                watchDir := (flags&syscall.NOTE_WRITE) == syscall.NOTE_WRITE &&
                        (!alreadyWatching || (w.dirFlags[name]&syscall.NOTE_WRITE) != syscall.NOTE_WRITE)
                // Store flags so this watch can be updated later
                w.dirFlags[name] = flags
-               w.dirmut.Unlock()
+               w.mu.Unlock()
 
                if watchDir {
                        if err := w.watchDirectoryFiles(name); err != nil {
@@ -255,8 +235,8 @@ func (w *Watcher) addWatch(name string, flags uint32) error {
        return nil
 }
 
-// readEvents reads from the kqueue file descriptor, converts the
-// received events into Event objects and sends them via the Events channel
+// readEvents reads from kqueue and converts the received kevents into
+// Event values that it sends down the Events channel.
 func (w *Watcher) readEvents() {
        eventBuffer := make([]syscall.Kevent_t, 10)
 
@@ -264,9 +244,9 @@ func (w *Watcher) readEvents() {
                // See if there is a message on the "done" channel
                select {
                case <-w.done:
-                       errno := syscall.Close(w.kq)
-                       if errno != nil {
-                               w.Errors <- os.NewSyscallError("close", errno)
+                       err := syscall.Close(w.kq)
+                       if err != nil {
+                               w.Errors <- os.NewSyscallError("close", err)
                        }
                        close(w.Events)
                        close(w.Errors)
@@ -284,18 +264,16 @@ func (w *Watcher) readEvents() {
 
                // Flush the events we received to the Events channel
                for len(kevents) > 0 {
-                       watchEvent := &kevents[0]
-                       watchfd := int(watchEvent.Ident)
-                       mask := uint32(watchEvent.Fflags)
-
-                       w.pmut.Lock()
+                       kevent := &kevents[0]
+                       watchfd := int(kevent.Ident)
+                       mask := uint32(kevent.Fflags)
+                       w.mu.Lock()
                        path := w.paths[watchfd]
-                       w.pmut.Unlock()
-
+                       w.mu.Unlock()
                        event := newEvent(path.name, mask)
 
                        if path.isDir && !(event.Op&Remove == Remove) {
-                               // Double check to make sure the directory exist. This can happen when
+                               // Double check to make sure the directory exists. This can happen when
                                // we do a rm -fr on a recursively watched folders and we receive a
                                // modification event first but the folder has been deleted and later
                                // receive the delete event
@@ -312,28 +290,20 @@ func (w *Watcher) readEvents() {
                                w.Events <- event
                        }
 
-                       // Move to next event
-                       kevents = kevents[1:]
-
-                       if event.Op&Rename == Rename {
+                       if event.Op&Rename == Rename || event.Op&Remove == Remove {
                                w.Remove(event.Name)
-                               w.femut.Lock()
+                               w.mu.Lock()
                                delete(w.fileExists, event.Name)
-                               w.femut.Unlock()
+                               w.mu.Unlock()
                        }
                        if event.Op&Remove == Remove {
-                               w.Remove(event.Name)
-                               w.femut.Lock()
-                               delete(w.fileExists, event.Name)
-                               w.femut.Unlock()
-
                                // Look for a file that may have overwritten this.
                                // For example, mv f1 f2 will delete f2, then create f2.
                                fileDir, _ := filepath.Split(event.Name)
                                fileDir = filepath.Clean(fileDir)
-                               w.wmut.Lock()
+                               w.mu.Lock()
                                _, found := w.watches[fileDir]
-                               w.wmut.Unlock()
+                               w.mu.Unlock()
                                if found {
                                        // make sure the directory exists before we watch for changes. When we
                                        // do a recursive watch and perform rm -fr, the parent directory might
@@ -345,6 +315,9 @@ func (w *Watcher) readEvents() {
                                        }
                                }
                        }
+
+                       // Move to next event
+                       kevents = kevents[1:]
                }
        }
 }
@@ -385,9 +358,9 @@ func (w *Watcher) watchDirectoryFiles(dirPath string) error {
                        return err
                }
 
-               w.femut.Lock()
+               w.mu.Lock()
                w.fileExists[filePath] = true
-               w.femut.Unlock()
+               w.mu.Unlock()
        }
 
        return nil
@@ -407,9 +380,9 @@ func (w *Watcher) sendDirectoryChangeEvents(dirPath string) {
        // Search for new files
        for _, fileInfo := range files {
                filePath := filepath.Join(dirPath, fileInfo.Name())
-               w.femut.Lock()
+               w.mu.Lock()
                _, doesExist := w.fileExists[filePath]
-               w.femut.Unlock()
+               w.mu.Unlock()
                if !doesExist {
                        // Send create event
                        w.Events <- newCreateEvent(filePath)
@@ -420,9 +393,9 @@ func (w *Watcher) sendDirectoryChangeEvents(dirPath string) {
                        return
                }
 
-               w.femut.Lock()
+               w.mu.Lock()
                w.fileExists[filePath] = true
-               w.femut.Unlock()
+               w.mu.Unlock()
        }
 }
 
@@ -430,9 +403,9 @@ func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) error {
        if fileInfo.IsDir() {
                // mimic Linux providing delete events for subdirectories
                // but preserve the flags used if currently watching subdirectory
-               w.dirmut.Lock()
+               w.mu.Lock()
                flags := w.dirFlags[name]
-               w.dirmut.Unlock()
+               w.mu.Unlock()
 
                flags |= syscall.NOTE_DELETE
                return w.addWatch(name, flags)
@@ -469,7 +442,7 @@ func register(kq int, fds []int, flags int, fflags uint32) error {
        return nil
 }
 
-// read retrieves pending events
+// read retrieves pending events, or waits until an event occurs.
 // A timeout of nil blocks indefinitely, while 0 polls the queue.
 func read(kq int, events []syscall.Kevent_t, timeout *syscall.Timespec) ([]syscall.Kevent_t, error) {
        n, err := syscall.Kevent(kq, nil, events, timeout)