From 2bfaa0051c953671a8879575f97ca718c52f8779 Mon Sep 17 00:00:00 2001 From: Martin Tournoij Date: Sat, 6 Aug 2022 19:10:41 +0200 Subject: [PATCH] all: add Watcher.{sendEvent,sendError} (#492) Add a helper function instead of doing a select on w.done and w.Errors or w.Events every time. Also move most functions to methods on the Watcher. --- inotify.go | 44 +++++++++++------- kqueue.go | 129 ++++++++++++++++++++++++++++------------------------- windows.go | 90 ++++++++++++++++++++++--------------- 3 files changed, 149 insertions(+), 114 deletions(-) diff --git a/inotify.go b/inotify.go index 9f33b18..258469f 100644 --- a/inotify.go +++ b/inotify.go @@ -60,6 +60,26 @@ func NewWatcher() (*Watcher, error) { return w, nil } +// Returns true if the event was sent, or false if watcher is closed. +func (w *Watcher) sendEvent(e Event) bool { + select { + case w.Events <- e: + return true + case <-w.done: + } + return false +} + +// Returns true if the error was sent, or false if watcher is closed. +func (w *Watcher) sendError(err error) bool { + select { + case w.Errors <- err: + return true + case <-w.done: + } + return false +} + func (w *Watcher) isClosed() bool { select { case <-w.done: @@ -100,12 +120,10 @@ func (w *Watcher) Add(name string) error { return errors.New("inotify instance already closed") } - const agnosticEvents = unix.IN_MOVED_TO | unix.IN_MOVED_FROM | + var flags uint32 = unix.IN_MOVED_TO | unix.IN_MOVED_FROM | unix.IN_CREATE | unix.IN_ATTRIB | unix.IN_MODIFY | unix.IN_MOVE_SELF | unix.IN_DELETE | unix.IN_DELETE_SELF - var flags uint32 = agnosticEvents - w.mu.Lock() defer w.mu.Unlock() watchEntry := w.watches[name] @@ -209,9 +227,7 @@ func (w *Watcher) readEvents() { case errors.Unwrap(err) == os.ErrClosed: return case err != nil: - select { - case w.Errors <- err: - case <-w.done: + if !w.sendError(err) { return } continue @@ -229,9 +245,7 @@ func (w *Watcher) readEvents() { // Read was too short. err = errors.New("notify: short read in readEvents()") } - select { - case w.Errors <- err: - case <-w.done: + if !w.sendError(err) { return } continue @@ -249,9 +263,7 @@ func (w *Watcher) readEvents() { ) if mask&unix.IN_Q_OVERFLOW != 0 { - select { - case w.Errors <- ErrEventOverflow: - case <-w.done: + if !w.sendError(ErrEventOverflow) { return } } @@ -279,13 +291,11 @@ func (w *Watcher) readEvents() { name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\000") } - event := newEvent(name, mask) + event := w.newEvent(name, mask) // Send the events that are not ignored on the events channel if mask&unix.IN_IGNORED == 0 { - select { - case w.Events <- event: - case <-w.done: + if !w.sendEvent(event) { return } } @@ -297,7 +307,7 @@ func (w *Watcher) readEvents() { } // newEvent returns an platform-independent Event based on an inotify mask. -func newEvent(name string, mask uint32) Event { +func (w *Watcher) newEvent(name string, mask uint32) Event { e := Event{Name: name} if mask&unix.IN_CREATE == unix.IN_CREATE || mask&unix.IN_MOVED_TO == unix.IN_MOVED_TO { e.Op |= Create diff --git a/kqueue.go b/kqueue.go index 765e85f..91ce099 100644 --- a/kqueue.go +++ b/kqueue.go @@ -44,7 +44,7 @@ type pathInfo struct { // NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. func NewWatcher() (*Watcher, error) { - kq, closepipe, err := kqueue() + kq, closepipe, err := newKqueue() if err != nil { return nil, err } @@ -67,6 +67,61 @@ func NewWatcher() (*Watcher, error) { return w, nil } +// newKqueue creates a new kernel event queue and returns a descriptor. +// +// This registers a new event on closepipe, which will trigger an event when +// it's closed. This way we can use kevent() without timeout/polling; without +// the closepipe, it would block forever and we wouldn't be able to stop it at +// all. +func newKqueue() (kq int, closepipe [2]int, err error) { + kq, err = unix.Kqueue() + if kq == -1 { + return kq, closepipe, err + } + + // Register the close pipe. + err = unix.Pipe(closepipe[:]) + if err != nil { + unix.Close(kq) + return kq, closepipe, err + } + + // Register changes to listen on the closepipe. + changes := make([]unix.Kevent_t, 1) + // SetKevent converts int to the platform-specific types. + unix.SetKevent(&changes[0], closepipe[0], unix.EVFILT_READ, + unix.EV_ADD|unix.EV_ENABLE|unix.EV_ONESHOT) + + ok, err := unix.Kevent(kq, changes, nil, nil) + if ok == -1 { + unix.Close(kq) + unix.Close(closepipe[0]) + unix.Close(closepipe[1]) + return kq, closepipe, err + } + return kq, closepipe, nil +} + +// Returns true if the event was sent, or false if watcher is closed. +func (w *Watcher) sendEvent(e Event) bool { + select { + case w.Events <- e: + return true + case <-w.done: + } + return false +} + +// Returns true if the error was sent, or false if watcher is closed. +func (w *Watcher) sendError(err error) bool { + select { + case w.Errors <- err: + return true + case <-w.done: + } + return false +} + // Close removes all watches and closes the events channel. func (w *Watcher) Close() error { w.mu.Lock() @@ -113,7 +168,7 @@ func (w *Watcher) Remove(name string) error { return fmt.Errorf("%w: %s", ErrNonExistentWatch, name) } - err := register(w.kq, []int{watchfd}, unix.EV_DELETE, 0) + err := w.register([]int{watchfd}, unix.EV_DELETE, 0) if err != nil { return err } @@ -248,7 +303,7 @@ func (w *Watcher) addWatch(name string, flags uint32) (string, error) { isDir = fi.IsDir() } - err := register(w.kq, []int{watchfd}, unix.EV_ADD|unix.EV_CLEAR|unix.EV_ENABLE, flags) + err := w.register([]int{watchfd}, unix.EV_ADD|unix.EV_CLEAR|unix.EV_ENABLE, flags) if err != nil { unix.Close(watchfd) return "", err @@ -306,14 +361,11 @@ func (w *Watcher) readEvents() { }() for closed := false; !closed; { - kevents, err := read(w.kq, eventBuffer) + kevents, err := w.read(eventBuffer) // EINTR is okay, the syscall was interrupted before timeout expired. if err != nil && err != unix.EINTR { - select { - case w.Errors <- err: - case <-w.done: + if !w.sendError(err) { closed = true - continue } continue } @@ -336,7 +388,7 @@ func (w *Watcher) readEvents() { path := w.paths[watchfd] w.mu.Unlock() - event := newEvent(path.name, mask) + event := w.newEvent(path.name, mask) if path.isDir && !event.Has(Remove) { // Double check to make sure the directory exists. This can @@ -358,10 +410,7 @@ func (w *Watcher) readEvents() { if path.isDir && event.Has(Write) && !event.Has(Remove) { w.sendDirectoryChangeEvents(event.Name) } else { - // Send the event on the Events channel. - select { - case w.Events <- event: - case <-w.done: + if !w.sendEvent(event) { closed = true continue } @@ -396,7 +445,7 @@ func (w *Watcher) readEvents() { } // newEvent returns an platform-independent Event based on kqueue Fflags. -func newEvent(name string, mask uint32) Event { +func (w *Watcher) newEvent(name string, mask uint32) Event { e := Event{Name: name} if mask&unix.NOTE_DELETE == unix.NOTE_DELETE { e.Op |= Remove @@ -454,9 +503,7 @@ func (w *Watcher) sendDirectoryChangeEvents(dirPath string) { // Get all files files, err := ioutil.ReadDir(dirPath) if err != nil { - select { - case w.Errors <- err: - case <-w.done: + if !w.sendError(err) { return } } @@ -478,9 +525,7 @@ func (w *Watcher) sendFileCreatedEventIfNew(filePath string, fileInfo os.FileInf w.mu.Unlock() if !doesExist { // Send create event - select { - case w.Events <- Event{Name: filePath, Op: Create}: - case <-w.done: + if !w.sendEvent(Event{Name: filePath, Op: Create}) { return } } @@ -514,43 +559,8 @@ func (w *Watcher) internalWatch(name string, fileInfo os.FileInfo) (string, erro return w.addWatch(name, noteAllEvents) } -// kqueue creates a new kernel event queue and returns a descriptor. -// -// This registers a new event on closepipe, which will trigger an event when -// it's closed. This way we can use kevent() without timeout/polling; without -// the closepipe, it would block forever and we wouldn't be able to stop it at -// all. -func kqueue() (kq int, closepipe [2]int, err error) { - kq, err = unix.Kqueue() - if kq == -1 { - return kq, closepipe, err - } - - // Register the close pipe. - err = unix.Pipe(closepipe[:]) - if err != nil { - unix.Close(kq) - return kq, closepipe, err - } - - // Register changes to listen on the closepipe. - changes := make([]unix.Kevent_t, 1) - // SetKevent converts int to the platform-specific types. - unix.SetKevent(&changes[0], closepipe[0], unix.EVFILT_READ, - unix.EV_ADD|unix.EV_ENABLE|unix.EV_ONESHOT) - - ok, err := unix.Kevent(kq, changes, nil, nil) - if ok == -1 { - unix.Close(kq) - unix.Close(closepipe[0]) - unix.Close(closepipe[1]) - return kq, closepipe, err - } - return kq, closepipe, nil -} - // Register events with the queue. -func register(kq int, fds []int, flags int, fflags uint32) error { +func (w *Watcher) register(fds []int, flags int, fflags uint32) error { changes := make([]unix.Kevent_t, len(fds)) for i, fd := range fds { // SetKevent converts int to the platform-specific types. @@ -559,7 +569,7 @@ func register(kq int, fds []int, flags int, fflags uint32) error { } // Register the events. - success, err := unix.Kevent(kq, changes, nil, nil) + success, err := unix.Kevent(w.kq, changes, nil, nil) if success == -1 { return err } @@ -567,9 +577,8 @@ func register(kq int, fds []int, flags int, fflags uint32) error { } // read retrieves pending events, or waits until an event occurs. -// A timeout of nil blocks indefinitely, while 0 polls the queue. -func read(kq int, events []unix.Kevent_t) ([]unix.Kevent_t, error) { - n, err := unix.Kevent(kq, nil, events, nil) +func (w *Watcher) read(events []unix.Kevent_t) ([]unix.Kevent_t, error) { + n, err := unix.Kevent(w.kq, nil, events, nil) if err != nil { return nil, err } diff --git a/windows.go b/windows.go index 3140b8b..17ada31 100644 --- a/windows.go +++ b/windows.go @@ -53,6 +53,30 @@ func NewWatcher() (*Watcher, error) { return w, nil } +func (w *Watcher) sendEvent(name string, mask uint64) bool { + if mask == 0 { + return false + } + + event := w.newEvent(name, uint32(mask)) + select { + case ch := <-w.quit: + w.quit <- ch + case w.Events <- event: + } + return true +} + +// Returns true if the error was sent, or false if watcher is closed. +func (w *Watcher) sendError(err error) bool { + select { + case w.Errors <- err: + return true + case <-w.quit: + } + return false +} + // Close removes all watches and closes the events channel. func (w *Watcher) Close() error { w.mu.Lock() @@ -123,6 +147,10 @@ func (w *Watcher) WatchList() []string { return entries } +// These options are from the old golang.org/x/exp/winfsnotify, where you could +// add various options to the watch. This has long since been removed. +// +// The "sys" in the name is misleading as they're not part of any "system". const ( // Options for AddWatch sysFSONESHOT = 0x80000000 @@ -147,7 +175,7 @@ const ( sysFSQOVERFLOW = 0x4000 ) -func newEvent(name string, mask uint32) Event { +func (w *Watcher) newEvent(name string, mask uint32) Event { e := Event{Name: name} if mask&sysFSCREATE == sysFSCREATE || mask&sysFSMOVEDTO == sysFSMOVEDTO { e.Op |= Create @@ -212,7 +240,7 @@ func (w *Watcher) wakeupReader() error { return nil } -func getDir(pathname string) (dir string, err error) { +func (w *Watcher) getDir(pathname string) (dir string, err error) { attr, err := windows.GetFileAttributes(windows.StringToUTF16Ptr(pathname)) if err != nil { return "", os.NewSyscallError("GetFileAttributes", err) @@ -226,7 +254,7 @@ func getDir(pathname string) (dir string, err error) { return } -func getIno(path string) (ino *inode, err error) { +func (w *Watcher) getIno(path string) (ino *inode, err error) { h, err := windows.CreateFile(windows.StringToUTF16Ptr(path), windows.FILE_LIST_DIRECTORY, windows.FILE_SHARE_READ|windows.FILE_SHARE_WRITE|windows.FILE_SHARE_DELETE, @@ -270,7 +298,7 @@ func (m watchMap) set(ino *inode, watch *watch) { // Must run within the I/O thread. func (w *Watcher) addWatch(pathname string, flags uint64) error { - dir, err := getDir(pathname) + dir, err := w.getDir(pathname) if err != nil { return err } @@ -278,7 +306,7 @@ func (w *Watcher) addWatch(pathname string, flags uint64) error { return nil } - ino, err := getIno(dir) + ino, err := w.getIno(dir) if err != nil { return err } @@ -324,11 +352,11 @@ func (w *Watcher) addWatch(pathname string, flags uint64) error { // Must run within the I/O thread. func (w *Watcher) remWatch(pathname string) error { - dir, err := getDir(pathname) + dir, err := w.getDir(pathname) if err != nil { return err } - ino, err := getIno(dir) + ino, err := w.getIno(dir) if err != nil { return err } @@ -339,7 +367,7 @@ func (w *Watcher) remWatch(pathname string) error { err = windows.CloseHandle(ino.handle) if err != nil { - w.Errors <- os.NewSyscallError("CloseHandle", err) + w.sendError(os.NewSyscallError("CloseHandle", err)) } if watch == nil { return fmt.Errorf("%w: %s", ErrNonExistentWatch, pathname) @@ -375,17 +403,17 @@ func (w *Watcher) deleteWatch(watch *watch) { func (w *Watcher) startRead(watch *watch) error { err := windows.CancelIo(watch.ino.handle) if err != nil { - w.Errors <- os.NewSyscallError("CancelIo", err) + w.sendError(os.NewSyscallError("CancelIo", err)) w.deleteWatch(watch) } - mask := toWindowsFlags(watch.mask) + mask := w.toWindowsFlags(watch.mask) for _, m := range watch.names { - mask |= toWindowsFlags(m) + mask |= w.toWindowsFlags(m) } if mask == 0 { err := windows.CloseHandle(watch.ino.handle) if err != nil { - w.Errors <- os.NewSyscallError("CloseHandle", err) + w.sendError(os.NewSyscallError("CloseHandle", err)) } w.mu.Lock() delete(w.watches[watch.ino.volume], watch.ino.index) @@ -469,7 +497,7 @@ func (w *Watcher) readEvents() { switch qErr { case windows.ERROR_MORE_DATA: if watch == nil { - w.Errors <- errors.New("ERROR_MORE_DATA has unexpectedly null lpOverlapped buffer") + w.sendError(errors.New("ERROR_MORE_DATA has unexpectedly null lpOverlapped buffer")) } else { // The i/o succeeded but the buffer is full. // In theory we should be building up a full packet. @@ -486,7 +514,7 @@ func (w *Watcher) readEvents() { // CancelIo was called on this handle continue default: - w.Errors <- os.NewSyscallError("GetQueuedCompletionPort", qErr) + w.sendError(os.NewSyscallError("GetQueuedCompletionPort", qErr)) continue case nil: } @@ -494,18 +522,18 @@ func (w *Watcher) readEvents() { var offset uint32 for { if n == 0 { - w.Events <- newEvent("", sysFSQOVERFLOW) - w.Errors <- errors.New("short read in readEvents()") + w.Events <- w.newEvent("", sysFSQOVERFLOW) + w.sendError(errors.New("short read in readEvents()")) break } // Point "raw" to the event in the buffer raw := (*windows.FileNotifyInformation)(unsafe.Pointer(&watch.buf[offset])) - // TODO: Consider using unsafe.Slice that is available from go1.17 - // https://stackoverflow.com/questions/51187973/how-to-create-an-array-or-a-slice-from-an-array-unsafe-pointer-in-golang - // instead of using a fixed windows.MAX_PATH buf, we create a buf that is the size of the path name + + // Create a buf that is the size of the path name size := int(raw.FileNameLength / 2) var buf []uint16 + // TODO: Use unsafe.Slice in Go 1.17; https://stackoverflow.com/questions/51187973 sh := (*reflect.SliceHeader)(unsafe.Pointer(&buf)) sh.Data = uintptr(unsafe.Pointer(&raw.FileName)) sh.Len = size @@ -555,7 +583,7 @@ func (w *Watcher) readEvents() { w.sendEvent(fullname, watch.names[name]&sysFSIGNORED) delete(watch.names, name) } - if w.sendEvent(fullname, watch.mask&toFSnotifyFlags(raw.Action)) { + if w.sendEvent(fullname, watch.mask&w.toFSnotifyFlags(raw.Action)) { if watch.mask&sysFSONESHOT != 0 { watch.mask = 0 } @@ -573,31 +601,19 @@ func (w *Watcher) readEvents() { // Error! if offset >= n { - w.Errors <- errors.New("Windows system assumed buffer larger than it is, events have likely been missed.") + w.sendError(errors.New( + "Windows system assumed buffer larger than it is, events have likely been missed.")) break } } if err := w.startRead(watch); err != nil { - w.Errors <- err + w.sendError(err) } } } -func (w *Watcher) sendEvent(name string, mask uint64) bool { - if mask == 0 { - return false - } - event := newEvent(name, uint32(mask)) - select { - case ch := <-w.quit: - w.quit <- ch - case w.Events <- event: - } - return true -} - -func toWindowsFlags(mask uint64) uint32 { +func (w *Watcher) toWindowsFlags(mask uint64) uint32 { var m uint32 if mask&sysFSACCESS != 0 { m |= windows.FILE_NOTIFY_CHANGE_LAST_ACCESS @@ -614,7 +630,7 @@ func toWindowsFlags(mask uint64) uint32 { return m } -func toFSnotifyFlags(action uint32) uint64 { +func (w *Watcher) toFSnotifyFlags(action uint32) uint64 { switch action { case windows.FILE_ACTION_ADDED: return sysFSCREATE -- 2.50.1