]> go.fuhry.dev Git - fsnotify.git/commitdiff
Fix possible deadlock on closing the watcher on kqueue (#230)
authorAnmol Sethi <me@anmol.io>
Wed, 10 Jan 2018 04:19:26 +0000 (23:19 -0500)
committerNathan Youngman <git@nathany.com>
Wed, 10 Jan 2018 04:19:26 +0000 (21:19 -0700)
* avoid deadlocks on Close()

raw channel work (not inside a select) should
always be prohibited in production code, as
it readily causes deadlocks on shutdown.

Also adds the test TestWatcherClose from #145.
This request duplicates that test, with two
lines fixed to address the houndcli-bot review
concerns.

Fixes #187
Fixes #145

* cleanup and simpler test

* also fix #225

* fix tests

fsnotify_test.go
kqueue.go

index 9d6d72afc5bba202108b8ae33e31d848d76cba10..f9771d9dfe5a94a440d7454b0e60b939586dc6e7 100644 (file)
@@ -6,7 +6,11 @@
 
 package fsnotify
 
-import "testing"
+import (
+       "os"
+       "testing"
+       "time"
+)
 
 func TestEventStringWithValue(t *testing.T) {
        for opMask, expectedString := range map[Op]string{
@@ -38,3 +42,29 @@ func TestEventOpStringWithNoValue(t *testing.T) {
                t.Fatalf("Expected %s, got: %v", expectedOpString, event.Op.String())
        }
 }
+
+// TestWatcherClose tests that the goroutine started by creating the watcher can be
+// signalled to return at any time, even if there is no goroutine listening on the events
+// or errors channels.
+func TestWatcherClose(t *testing.T) {
+       t.Parallel()
+
+       name := tempMkFile(t, "")
+       w := newWatcher(t)
+       err := w.Add(name)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       err = os.Remove(name)
+       if err != nil {
+               t.Fatal(err)
+       }
+       // Allow the watcher to receive the event.
+       time.Sleep(time.Millisecond * 100)
+
+       err = w.Close()
+       if err != nil {
+               t.Fatal(err)
+       }
+}
index c2b4acb18ddf28874ca3ff4d2ef34076b9c253a3..86e76a3d676836ae8857edb1ed507461c3558196 100644 (file)
--- a/kqueue.go
+++ b/kqueue.go
@@ -22,7 +22,7 @@ import (
 type Watcher struct {
        Events chan Event
        Errors chan error
-       done   chan bool // Channel for sending a "quit message" to the reader goroutine
+       done   chan struct{} // Channel for sending a "quit message" to the reader goroutine
 
        kq int // File descriptor (as returned by the kqueue() syscall).
 
@@ -56,7 +56,7 @@ func NewWatcher() (*Watcher, error) {
                externalWatches: make(map[string]bool),
                Events:          make(chan Event),
                Errors:          make(chan error),
-               done:            make(chan bool),
+               done:            make(chan struct{}),
        }
 
        go w.readEvents()
@@ -71,10 +71,8 @@ func (w *Watcher) Close() error {
                return nil
        }
        w.isClosed = true
-       w.mu.Unlock()
 
        // copy paths to remove while locked
-       w.mu.Lock()
        var pathsToRemove = make([]string, 0, len(w.watches))
        for name := range w.watches {
                pathsToRemove = append(pathsToRemove, name)
@@ -82,15 +80,12 @@ func (w *Watcher) Close() error {
        w.mu.Unlock()
        // unlock before calling Remove, which also locks
 
-       var err error
        for _, name := range pathsToRemove {
-               if e := w.Remove(name); e != nil && err == nil {
-                       err = e
-               }
+               w.Remove(name)
        }
 
-       // Send "quit" message to the reader goroutine:
-       w.done <- true
+       // send a "quit" message to the reader goroutine
+       close(w.done)
 
        return nil
 }
@@ -266,17 +261,12 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
 func (w *Watcher) readEvents() {
        eventBuffer := make([]unix.Kevent_t, 10)
 
+loop:
        for {
                // See if there is a message on the "done" channel
                select {
                case <-w.done:
-                       err := unix.Close(w.kq)
-                       if err != nil {
-                               w.Errors <- err
-                       }
-                       close(w.Events)
-                       close(w.Errors)
-                       return
+                       break loop
                default:
                }
 
@@ -284,7 +274,11 @@ func (w *Watcher) readEvents() {
                kevents, err := read(w.kq, eventBuffer, &keventWaitTime)
                // EINTR is okay, the syscall was interrupted before timeout expired.
                if err != nil && err != unix.EINTR {
-                       w.Errors <- err
+                       select {
+                       case w.Errors <- err:
+                       case <-w.done:
+                               break loop
+                       }
                        continue
                }
 
@@ -319,8 +313,12 @@ func (w *Watcher) readEvents() {
                        if path.isDir && event.Op&Write == Write && !(event.Op&Remove == Remove) {
                                w.sendDirectoryChangeEvents(event.Name)
                        } else {
-                               // Send the event on the Events channel
-                               w.Events <- event
+                               // Send the event on the Events channel.
+                               select {
+                               case w.Events <- event:
+                               case <-w.done:
+                                       break loop
+                               }
                        }
 
                        if event.Op&Remove == Remove {
@@ -352,6 +350,18 @@ func (w *Watcher) readEvents() {
                        kevents = kevents[1:]
                }
        }
+
+       // cleanup
+       err := unix.Close(w.kq)
+       if err != nil {
+               // only way the previous loop breaks is if w.done was closed so we need to async send to w.Errors.
+               select {
+               case w.Errors <- err:
+               default:
+               }
+       }
+       close(w.Events)
+       close(w.Errors)
 }
 
 // newEvent returns an platform-independent Event based on kqueue Fflags.
@@ -407,7 +417,11 @@ func (w *Watcher) sendDirectoryChangeEvents(dirPath string) {
        // Get all files
        files, err := ioutil.ReadDir(dirPath)
        if err != nil {
-               w.Errors <- err
+               select {
+               case w.Errors <- err:
+               case <-w.done:
+                       return
+               }
        }
 
        // Search for new files
@@ -428,7 +442,11 @@ func (w *Watcher) sendFileCreatedEventIfNew(filePath string, fileInfo os.FileInf
        w.mu.Unlock()
        if !doesExist {
                // Send create event
-               w.Events <- newCreateEvent(filePath)
+               select {
+               case w.Events <- newCreateEvent(filePath):
+               case <-w.done:
+                       return
+               }
        }
 
        // like watchDirectoryFiles (but without doing another ReadDir)