]> go.fuhry.dev Git - fsnotify.git/commitdiff
all: add Watcher.{sendEvent,sendError} (#492)
authorMartin Tournoij <martin@arp242.net>
Sat, 6 Aug 2022 17:10:41 +0000 (19:10 +0200)
committerGitHub <noreply@github.com>
Sat, 6 Aug 2022 17:10:41 +0000 (19:10 +0200)
Add a helper function instead of doing a select on w.done and w.Errors
or w.Events every time.

Also move most functions to methods on the Watcher.

inotify.go
kqueue.go
windows.go

index 9f33b1867e5f159f4b1e239e0a16129554a5f6ef..258469f0dcaccc904859c4862df6b57bf379b9c1 100644 (file)
@@ -60,6 +60,26 @@ func NewWatcher() (*Watcher, error) {
        return w, nil
 }
 
+// Returns true if the event was sent, or false if watcher is closed.
+func (w *Watcher) sendEvent(e Event) bool {
+       select {
+       case w.Events <- e:
+               return true
+       case <-w.done:
+       }
+       return false
+}
+
+// Returns true if the error was sent, or false if watcher is closed.
+func (w *Watcher) sendError(err error) bool {
+       select {
+       case w.Errors <- err:
+               return true
+       case <-w.done:
+       }
+       return false
+}
+
 func (w *Watcher) isClosed() bool {
        select {
        case <-w.done:
@@ -100,12 +120,10 @@ func (w *Watcher) Add(name string) error {
                return errors.New("inotify instance already closed")
        }
 
-       const agnosticEvents = unix.IN_MOVED_TO | unix.IN_MOVED_FROM |
+       var flags uint32 = unix.IN_MOVED_TO | unix.IN_MOVED_FROM |
                unix.IN_CREATE | unix.IN_ATTRIB | unix.IN_MODIFY |
                unix.IN_MOVE_SELF | unix.IN_DELETE | unix.IN_DELETE_SELF
 
-       var flags uint32 = agnosticEvents
-
        w.mu.Lock()
        defer w.mu.Unlock()
        watchEntry := w.watches[name]
@@ -209,9 +227,7 @@ func (w *Watcher) readEvents() {
                case errors.Unwrap(err) == os.ErrClosed:
                        return
                case err != nil:
-                       select {
-                       case w.Errors <- err:
-                       case <-w.done:
+                       if !w.sendError(err) {
                                return
                        }
                        continue
@@ -229,9 +245,7 @@ func (w *Watcher) readEvents() {
                                // Read was too short.
                                err = errors.New("notify: short read in readEvents()")
                        }
-                       select {
-                       case w.Errors <- err:
-                       case <-w.done:
+                       if !w.sendError(err) {
                                return
                        }
                        continue
@@ -249,9 +263,7 @@ func (w *Watcher) readEvents() {
                        )
 
                        if mask&unix.IN_Q_OVERFLOW != 0 {
-                               select {
-                               case w.Errors <- ErrEventOverflow:
-                               case <-w.done:
+                               if !w.sendError(ErrEventOverflow) {
                                        return
                                }
                        }
@@ -279,13 +291,11 @@ func (w *Watcher) readEvents() {
                                name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\000")
                        }
 
-                       event := newEvent(name, mask)
+                       event := w.newEvent(name, mask)
 
                        // Send the events that are not ignored on the events channel
                        if mask&unix.IN_IGNORED == 0 {
-                               select {
-                               case w.Events <- event:
-                               case <-w.done:
+                               if !w.sendEvent(event) {
                                        return
                                }
                        }
@@ -297,7 +307,7 @@ func (w *Watcher) readEvents() {
 }
 
 // newEvent returns an platform-independent Event based on an inotify mask.
-func newEvent(name string, mask uint32) Event {
+func (w *Watcher) newEvent(name string, mask uint32) Event {
        e := Event{Name: name}
        if mask&unix.IN_CREATE == unix.IN_CREATE || mask&unix.IN_MOVED_TO == unix.IN_MOVED_TO {
                e.Op |= Create
index 765e85f86752137e7ce2d60b55a74ddeda51553c..91ce0997a14a988e9f3cc92c5f4765782d1680c6 100644 (file)
--- a/kqueue.go
+++ b/kqueue.go
@@ -44,7 +44,7 @@ type pathInfo struct {
 
 // NewWatcher establishes a new watcher with the underlying OS and begins waiting for events.
 func NewWatcher() (*Watcher, error) {
-       kq, closepipe, err := kqueue()
+       kq, closepipe, err := newKqueue()
        if err != nil {
                return nil, err
        }
@@ -67,6 +67,61 @@ func NewWatcher() (*Watcher, error) {
        return w, nil
 }
 
+// newKqueue creates a new kernel event queue and returns a descriptor.
+//
+// This registers a new event on closepipe, which will trigger an event when
+// it's closed. This way we can use kevent() without timeout/polling; without
+// the closepipe, it would block forever and we wouldn't be able to stop it at
+// all.
+func newKqueue() (kq int, closepipe [2]int, err error) {
+       kq, err = unix.Kqueue()
+       if kq == -1 {
+               return kq, closepipe, err
+       }
+
+       // Register the close pipe.
+       err = unix.Pipe(closepipe[:])
+       if err != nil {
+               unix.Close(kq)
+               return kq, closepipe, err
+       }
+
+       // Register changes to listen on the closepipe.
+       changes := make([]unix.Kevent_t, 1)
+       // SetKevent converts int to the platform-specific types.
+       unix.SetKevent(&changes[0], closepipe[0], unix.EVFILT_READ,
+               unix.EV_ADD|unix.EV_ENABLE|unix.EV_ONESHOT)
+
+       ok, err := unix.Kevent(kq, changes, nil, nil)
+       if ok == -1 {
+               unix.Close(kq)
+               unix.Close(closepipe[0])
+               unix.Close(closepipe[1])
+               return kq, closepipe, err
+       }
+       return kq, closepipe, nil
+}
+
+// Returns true if the event was sent, or false if watcher is closed.
+func (w *Watcher) sendEvent(e Event) bool {
+       select {
+       case w.Events <- e:
+               return true
+       case <-w.done:
+       }
+       return false
+}
+
+// Returns true if the error was sent, or false if watcher is closed.
+func (w *Watcher) sendError(err error) bool {
+       select {
+       case w.Errors <- err:
+               return true
+       case <-w.done:
+       }
+       return false
+}
+
 // Close removes all watches and closes the events channel.
 func (w *Watcher) Close() error {
        w.mu.Lock()
@@ -113,7 +168,7 @@ func (w *Watcher) Remove(name string) error {
                return fmt.Errorf("%w: %s", ErrNonExistentWatch, name)
        }
 
-       err := register(w.kq, []int{watchfd}, unix.EV_DELETE, 0)
+       err := w.register([]int{watchfd}, unix.EV_DELETE, 0)
        if err != nil {
                return err
        }
@@ -248,7 +303,7 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) {
                isDir = fi.IsDir()
        }
 
-       err := register(w.kq, []int{watchfd}, unix.EV_ADD|unix.EV_CLEAR|unix.EV_ENABLE, flags)
+       err := w.register([]int{watchfd}, unix.EV_ADD|unix.EV_CLEAR|unix.EV_ENABLE, flags)
        if err != nil {
                unix.Close(watchfd)
                return "", err
@@ -306,14 +361,11 @@ func (w *Watcher) readEvents() {
        }()
 
        for closed := false; !closed; {
-               kevents, err := read(w.kq, eventBuffer)
+               kevents, err := w.read(eventBuffer)
                // EINTR is okay, the syscall was interrupted before timeout expired.
                if err != nil && err != unix.EINTR {
-                       select {
-                       case w.Errors <- err:
-                       case <-w.done:
+                       if !w.sendError(err) {
                                closed = true
-                               continue
                        }
                        continue
                }
@@ -336,7 +388,7 @@ func (w *Watcher) readEvents() {
                        path := w.paths[watchfd]
                        w.mu.Unlock()
 
-                       event := newEvent(path.name, mask)
+                       event := w.newEvent(path.name, mask)
 
                        if path.isDir && !event.Has(Remove) {
                                // Double check to make sure the directory exists. This can
@@ -358,10 +410,7 @@ func (w *Watcher) readEvents() {
                        if path.isDir && event.Has(Write) && !event.Has(Remove) {
                                w.sendDirectoryChangeEvents(event.Name)
                        } else {
-                               // Send the event on the Events channel.
-                               select {
-                               case w.Events <- event:
-                               case <-w.done:
+                               if !w.sendEvent(event) {
                                        closed = true
                                        continue
                                }
@@ -396,7 +445,7 @@ func (w *Watcher) readEvents() {
 }
 
 // newEvent returns an platform-independent Event based on kqueue Fflags.
-func newEvent(name string, mask uint32) Event {
+func (w *Watcher) newEvent(name string, mask uint32) Event {
        e := Event{Name: name}
        if mask&unix.NOTE_DELETE == unix.NOTE_DELETE {
                e.Op |= Remove
@@ -454,9 +503,7 @@ func (w *Watcher) sendDirectoryChangeEvents(dirPath string) {
        // Get all files
        files, err := ioutil.ReadDir(dirPath)
        if err != nil {
-               select {
-               case w.Errors <- err:
-               case <-w.done:
+               if !w.sendError(err) {
                        return
                }
        }
@@ -478,9 +525,7 @@ func (w *Watcher) sendFileCreatedEventIfNew(filePath string, fileInfo os.FileInf
        w.mu.Unlock()
        if !doesExist {
                // Send create event
-               select {
-               case w.Events <- Event{Name: filePath, Op: Create}:
-               case <-w.done:
+               if !w.sendEvent(Event{Name: filePath, Op: Create}) {
                        return
                }
        }
@@ -514,43 +559,8 @@ func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, erro
        return w.addWatch(name, noteAllEvents)
 }
 
-// kqueue creates a new kernel event queue and returns a descriptor.
-//
-// This registers a new event on closepipe, which will trigger an event when
-// it's closed. This way we can use kevent() without timeout/polling; without
-// the closepipe, it would block forever and we wouldn't be able to stop it at
-// all.
-func kqueue() (kq int, closepipe [2]int, err error) {
-       kq, err = unix.Kqueue()
-       if kq == -1 {
-               return kq, closepipe, err
-       }
-
-       // Register the close pipe.
-       err = unix.Pipe(closepipe[:])
-       if err != nil {
-               unix.Close(kq)
-               return kq, closepipe, err
-       }
-
-       // Register changes to listen on the closepipe.
-       changes := make([]unix.Kevent_t, 1)
-       // SetKevent converts int to the platform-specific types.
-       unix.SetKevent(&changes[0], closepipe[0], unix.EVFILT_READ,
-               unix.EV_ADD|unix.EV_ENABLE|unix.EV_ONESHOT)
-
-       ok, err := unix.Kevent(kq, changes, nil, nil)
-       if ok == -1 {
-               unix.Close(kq)
-               unix.Close(closepipe[0])
-               unix.Close(closepipe[1])
-               return kq, closepipe, err
-       }
-       return kq, closepipe, nil
-}
-
 // Register events with the queue.
-func register(kq int, fds []int, flags int, fflags uint32) error {
+func (w *Watcher) register(fds []int, flags int, fflags uint32) error {
        changes := make([]unix.Kevent_t, len(fds))
        for i, fd := range fds {
                // SetKevent converts int to the platform-specific types.
@@ -559,7 +569,7 @@ func register(kq int, fds []int, flags int, fflags uint32) error {
        }
 
        // Register the events.
-       success, err := unix.Kevent(kq, changes, nil, nil)
+       success, err := unix.Kevent(w.kq, changes, nil, nil)
        if success == -1 {
                return err
        }
@@ -567,9 +577,8 @@ func register(kq int, fds []int, flags int, fflags uint32) error {
 }
 
 // read retrieves pending events, or waits until an event occurs.
-// A timeout of nil blocks indefinitely, while 0 polls the queue.
-func read(kq int, events []unix.Kevent_t) ([]unix.Kevent_t, error) {
-       n, err := unix.Kevent(kq, nil, events, nil)
+func (w *Watcher) read(events []unix.Kevent_t) ([]unix.Kevent_t, error) {
+       n, err := unix.Kevent(w.kq, nil, events, nil)
        if err != nil {
                return nil, err
        }
index 3140b8bb9fa2f8acf08a564f97dea27cbad22698..17ada311debb6fc702fbdc67b152951c9794547f 100644 (file)
@@ -53,6 +53,30 @@ func NewWatcher() (*Watcher, error) {
        return w, nil
 }
 
+func (w *Watcher) sendEvent(name string, mask uint64) bool {
+       if mask == 0 {
+               return false
+       }
+
+       event := w.newEvent(name, uint32(mask))
+       select {
+       case ch := <-w.quit:
+               w.quit <- ch
+       case w.Events <- event:
+       }
+       return true
+}
+
+// Returns true if the error was sent, or false if watcher is closed.
+func (w *Watcher) sendError(err error) bool {
+       select {
+       case w.Errors <- err:
+               return true
+       case <-w.quit:
+       }
+       return false
+}
+
 // Close removes all watches and closes the events channel.
 func (w *Watcher) Close() error {
        w.mu.Lock()
@@ -123,6 +147,10 @@ func (w *Watcher) WatchList() []string {
        return entries
 }
 
+// These options are from the old golang.org/x/exp/winfsnotify, where you could
+// add various options to the watch. This has long since been removed.
+//
+// The "sys" in the name is misleading as they're not part of any "system".
 const (
        // Options for AddWatch
        sysFSONESHOT = 0x80000000
@@ -147,7 +175,7 @@ const (
        sysFSQOVERFLOW = 0x4000
 )
 
-func newEvent(name string, mask uint32) Event {
+func (w *Watcher) newEvent(name string, mask uint32) Event {
        e := Event{Name: name}
        if mask&sysFSCREATE == sysFSCREATE || mask&sysFSMOVEDTO == sysFSMOVEDTO {
                e.Op |= Create
@@ -212,7 +240,7 @@ func (w *Watcher) wakeupReader() error {
        return nil
 }
 
-func getDir(pathname string) (dir string, err error) {
+func (w *Watcher) getDir(pathname string) (dir string, err error) {
        attr, err := windows.GetFileAttributes(windows.StringToUTF16Ptr(pathname))
        if err != nil {
                return "", os.NewSyscallError("GetFileAttributes", err)
@@ -226,7 +254,7 @@ func getDir(pathname string) (dir string, err error) {
        return
 }
 
-func getIno(path string) (ino *inode, err error) {
+func (w *Watcher) getIno(path string) (ino *inode, err error) {
        h, err := windows.CreateFile(windows.StringToUTF16Ptr(path),
                windows.FILE_LIST_DIRECTORY,
                windows.FILE_SHARE_READ|windows.FILE_SHARE_WRITE|windows.FILE_SHARE_DELETE,
@@ -270,7 +298,7 @@ func (m watchMap) set(ino *inode, watch *watch) {
 
 // Must run within the I/O thread.
 func (w *Watcher) addWatch(pathname string, flags uint64) error {
-       dir, err := getDir(pathname)
+       dir, err := w.getDir(pathname)
        if err != nil {
                return err
        }
@@ -278,7 +306,7 @@ func (w *Watcher) addWatch(pathname string, flags uint64) error {
                return nil
        }
 
-       ino, err := getIno(dir)
+       ino, err := w.getIno(dir)
        if err != nil {
                return err
        }
@@ -324,11 +352,11 @@ func (w *Watcher) addWatch(pathname string, flags uint64) error {
 
 // Must run within the I/O thread.
 func (w *Watcher) remWatch(pathname string) error {
-       dir, err := getDir(pathname)
+       dir, err := w.getDir(pathname)
        if err != nil {
                return err
        }
-       ino, err := getIno(dir)
+       ino, err := w.getIno(dir)
        if err != nil {
                return err
        }
@@ -339,7 +367,7 @@ func (w *Watcher) remWatch(pathname string) error {
 
        err = windows.CloseHandle(ino.handle)
        if err != nil {
-               w.Errors <- os.NewSyscallError("CloseHandle", err)
+               w.sendError(os.NewSyscallError("CloseHandle", err))
        }
        if watch == nil {
                return fmt.Errorf("%w: %s", ErrNonExistentWatch, pathname)
@@ -375,17 +403,17 @@ func (w *Watcher) deleteWatch(watch *watch) {
 func (w *Watcher) startRead(watch *watch) error {
        err := windows.CancelIo(watch.ino.handle)
        if err != nil {
-               w.Errors <- os.NewSyscallError("CancelIo", err)
+               w.sendError(os.NewSyscallError("CancelIo", err))
                w.deleteWatch(watch)
        }
-       mask := toWindowsFlags(watch.mask)
+       mask := w.toWindowsFlags(watch.mask)
        for _, m := range watch.names {
-               mask |= toWindowsFlags(m)
+               mask |= w.toWindowsFlags(m)
        }
        if mask == 0 {
                err := windows.CloseHandle(watch.ino.handle)
                if err != nil {
-                       w.Errors <- os.NewSyscallError("CloseHandle", err)
+                       w.sendError(os.NewSyscallError("CloseHandle", err))
                }
                w.mu.Lock()
                delete(w.watches[watch.ino.volume], watch.ino.index)
@@ -469,7 +497,7 @@ func (w *Watcher) readEvents() {
                switch qErr {
                case windows.ERROR_MORE_DATA:
                        if watch == nil {
-                               w.Errors <- errors.New("ERROR_MORE_DATA has unexpectedly null lpOverlapped buffer")
+                               w.sendError(errors.New("ERROR_MORE_DATA has unexpectedly null lpOverlapped buffer"))
                        } else {
                                // The i/o succeeded but the buffer is full.
                                // In theory we should be building up a full packet.
@@ -486,7 +514,7 @@ func (w *Watcher) readEvents() {
                        // CancelIo was called on this handle
                        continue
                default:
-                       w.Errors <- os.NewSyscallError("GetQueuedCompletionPort", qErr)
+                       w.sendError(os.NewSyscallError("GetQueuedCompletionPort", qErr))
                        continue
                case nil:
                }
@@ -494,18 +522,18 @@ func (w *Watcher) readEvents() {
                var offset uint32
                for {
                        if n == 0 {
-                               w.Events <- newEvent("", sysFSQOVERFLOW)
-                               w.Errors <- errors.New("short read in readEvents()")
+                               w.Events <- w.newEvent("", sysFSQOVERFLOW)
+                               w.sendError(errors.New("short read in readEvents()"))
                                break
                        }
 
                        // Point "raw" to the event in the buffer
                        raw := (*windows.FileNotifyInformation)(unsafe.Pointer(&watch.buf[offset]))
-                       // TODO: Consider using unsafe.Slice that is available from go1.17
-                       // https://stackoverflow.com/questions/51187973/how-to-create-an-array-or-a-slice-from-an-array-unsafe-pointer-in-golang
-                       // instead of using a fixed windows.MAX_PATH buf, we create a buf that is the size of the path name
+
+                       // Create a buf that is the size of the path name
                        size := int(raw.FileNameLength / 2)
                        var buf []uint16
+                       // TODO: Use unsafe.Slice in Go 1.17; https://stackoverflow.com/questions/51187973
                        sh := (*reflect.SliceHeader)(unsafe.Pointer(&buf))
                        sh.Data = uintptr(unsafe.Pointer(&raw.FileName))
                        sh.Len = size
@@ -555,7 +583,7 @@ func (w *Watcher) readEvents() {
                                w.sendEvent(fullname, watch.names[name]&sysFSIGNORED)
                                delete(watch.names, name)
                        }
-                       if w.sendEvent(fullname, watch.mask&toFSnotifyFlags(raw.Action)) {
+                       if w.sendEvent(fullname, watch.mask&w.toFSnotifyFlags(raw.Action)) {
                                if watch.mask&sysFSONESHOT != 0 {
                                        watch.mask = 0
                                }
@@ -573,31 +601,19 @@ func (w *Watcher) readEvents() {
 
                        // Error!
                        if offset >= n {
-                               w.Errors <- errors.New("Windows system assumed buffer larger than it is, events have likely been missed.")
+                               w.sendError(errors.New(
+                                       "Windows system assumed buffer larger than it is, events have likely been missed."))
                                break
                        }
                }
 
                if err := w.startRead(watch); err != nil {
-                       w.Errors <- err
+                       w.sendError(err)
                }
        }
 }
 
-func (w *Watcher) sendEvent(name string, mask uint64) bool {
-       if mask == 0 {
-               return false
-       }
-       event := newEvent(name, uint32(mask))
-       select {
-       case ch := <-w.quit:
-               w.quit <- ch
-       case w.Events <- event:
-       }
-       return true
-}
-
-func toWindowsFlags(mask uint64) uint32 {
+func (w *Watcher) toWindowsFlags(mask uint64) uint32 {
        var m uint32
        if mask&sysFSACCESS != 0 {
                m |= windows.FILE_NOTIFY_CHANGE_LAST_ACCESS
@@ -614,7 +630,7 @@ func toWindowsFlags(mask uint64) uint32 {
        return m
 }
 
-func toFSnotifyFlags(action uint32) uint64 {
+func (w *Watcher) toFSnotifyFlags(action uint32) uint64 {
        switch action {
        case windows.FILE_ACTION_ADDED:
                return sysFSCREATE