return w, nil
}
+// Returns true if the event was sent, or false if watcher is closed.
+func (w *Watcher) sendEvent(e Event) bool {
+ select {
+ case w.Events <- e:
+ return true
+ case <-w.done:
+ }
+ return false
+}
+
+// Returns true if the error was sent, or false if watcher is closed.
+func (w *Watcher) sendError(err error) bool {
+ select {
+ case w.Errors <- err:
+ return true
+ case <-w.done:
+ }
+ return false
+}
+
func (w *Watcher) isClosed() bool {
select {
case <-w.done:
return errors.New("inotify instance already closed")
}
- const agnosticEvents = unix.IN_MOVED_TO | unix.IN_MOVED_FROM |
+ var flags uint32 = unix.IN_MOVED_TO | unix.IN_MOVED_FROM |
unix.IN_CREATE | unix.IN_ATTRIB | unix.IN_MODIFY |
unix.IN_MOVE_SELF | unix.IN_DELETE | unix.IN_DELETE_SELF
- var flags uint32 = agnosticEvents
-
w.mu.Lock()
defer w.mu.Unlock()
watchEntry := w.watches[name]
case errors.Unwrap(err) == os.ErrClosed:
return
case err != nil:
- select {
- case w.Errors <- err:
- case <-w.done:
+ if !w.sendError(err) {
return
}
continue
// Read was too short.
err = errors.New("notify: short read in readEvents()")
}
- select {
- case w.Errors <- err:
- case <-w.done:
+ if !w.sendError(err) {
return
}
continue
)
if mask&unix.IN_Q_OVERFLOW != 0 {
- select {
- case w.Errors <- ErrEventOverflow:
- case <-w.done:
+ if !w.sendError(ErrEventOverflow) {
return
}
}
name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\000")
}
- event := newEvent(name, mask)
+ event := w.newEvent(name, mask)
// Send the events that are not ignored on the events channel
if mask&unix.IN_IGNORED == 0 {
- select {
- case w.Events <- event:
- case <-w.done:
+ if !w.sendEvent(event) {
return
}
}
}
// newEvent returns an platform-independent Event based on an inotify mask.
-func newEvent(name string, mask uint32) Event {
+func (w *Watcher) newEvent(name string, mask uint32) Event {
e := Event{Name: name}
if mask&unix.IN_CREATE == unix.IN_CREATE || mask&unix.IN_MOVED_TO == unix.IN_MOVED_TO {
e.Op |= Create
// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events.
func NewWatcher() (*Watcher, error) {
- kq, closepipe, err := kqueue()
+ kq, closepipe, err := newKqueue()
if err != nil {
return nil, err
}
return w, nil
}
+// newKqueue creates a new kernel event queue and returns a descriptor.
+//
+// This registers a new event on closepipe, which will trigger an event when
+// it's closed. This way we can use kevent() without timeout/polling; without
+// the closepipe, it would block forever and we wouldn't be able to stop it at
+// all.
+func newKqueue() (kq int, closepipe [2]int, err error) {
+ kq, err = unix.Kqueue()
+ if kq == -1 {
+ return kq, closepipe, err
+ }
+
+ // Register the close pipe.
+ err = unix.Pipe(closepipe[:])
+ if err != nil {
+ unix.Close(kq)
+ return kq, closepipe, err
+ }
+
+ // Register changes to listen on the closepipe.
+ changes := make([]unix.Kevent_t, 1)
+ // SetKevent converts int to the platform-specific types.
+ unix.SetKevent(&changes[0], closepipe[0], unix.EVFILT_READ,
+ unix.EV_ADD|unix.EV_ENABLE|unix.EV_ONESHOT)
+
+ ok, err := unix.Kevent(kq, changes, nil, nil)
+ if ok == -1 {
+ unix.Close(kq)
+ unix.Close(closepipe[0])
+ unix.Close(closepipe[1])
+ return kq, closepipe, err
+ }
+ return kq, closepipe, nil
+}
+
+// Returns true if the event was sent, or false if watcher is closed.
+func (w *Watcher) sendEvent(e Event) bool {
+ select {
+ case w.Events <- e:
+ return true
+ case <-w.done:
+ }
+ return false
+}
+
+// Returns true if the error was sent, or false if watcher is closed.
+func (w *Watcher) sendError(err error) bool {
+ select {
+ case w.Errors <- err:
+ return true
+ case <-w.done:
+ }
+ return false
+}
+
// Close removes all watches and closes the events channel.
func (w *Watcher) Close() error {
w.mu.Lock()
return fmt.Errorf("%w: %s", ErrNonExistentWatch, name)
}
- err := register(w.kq, []int{watchfd}, unix.EV_DELETE, 0)
+ err := w.register([]int{watchfd}, unix.EV_DELETE, 0)
if err != nil {
return err
}
isDir = fi.IsDir()
}
- err := register(w.kq, []int{watchfd}, unix.EV_ADD|unix.EV_CLEAR|unix.EV_ENABLE, flags)
+ err := w.register([]int{watchfd}, unix.EV_ADD|unix.EV_CLEAR|unix.EV_ENABLE, flags)
if err != nil {
unix.Close(watchfd)
return "", err
}()
for closed := false; !closed; {
- kevents, err := read(w.kq, eventBuffer)
+ kevents, err := w.read(eventBuffer)
// EINTR is okay, the syscall was interrupted before timeout expired.
if err != nil && err != unix.EINTR {
- select {
- case w.Errors <- err:
- case <-w.done:
+ if !w.sendError(err) {
closed = true
- continue
}
continue
}
path := w.paths[watchfd]
w.mu.Unlock()
- event := newEvent(path.name, mask)
+ event := w.newEvent(path.name, mask)
if path.isDir && !event.Has(Remove) {
// Double check to make sure the directory exists. This can
if path.isDir && event.Has(Write) && !event.Has(Remove) {
w.sendDirectoryChangeEvents(event.Name)
} else {
- // Send the event on the Events channel.
- select {
- case w.Events <- event:
- case <-w.done:
+ if !w.sendEvent(event) {
closed = true
continue
}
}
// newEvent returns an platform-independent Event based on kqueue Fflags.
-func newEvent(name string, mask uint32) Event {
+func (w *Watcher) newEvent(name string, mask uint32) Event {
e := Event{Name: name}
if mask&unix.NOTE_DELETE == unix.NOTE_DELETE {
e.Op |= Remove
// Get all files
files, err := ioutil.ReadDir(dirPath)
if err != nil {
- select {
- case w.Errors <- err:
- case <-w.done:
+ if !w.sendError(err) {
return
}
}
w.mu.Unlock()
if !doesExist {
// Send create event
- select {
- case w.Events <- Event{Name: filePath, Op: Create}:
- case <-w.done:
+ if !w.sendEvent(Event{Name: filePath, Op: Create}) {
return
}
}
return w.addWatch(name, noteAllEvents)
}
-// kqueue creates a new kernel event queue and returns a descriptor.
-//
-// This registers a new event on closepipe, which will trigger an event when
-// it's closed. This way we can use kevent() without timeout/polling; without
-// the closepipe, it would block forever and we wouldn't be able to stop it at
-// all.
-func kqueue() (kq int, closepipe [2]int, err error) {
- kq, err = unix.Kqueue()
- if kq == -1 {
- return kq, closepipe, err
- }
-
- // Register the close pipe.
- err = unix.Pipe(closepipe[:])
- if err != nil {
- unix.Close(kq)
- return kq, closepipe, err
- }
-
- // Register changes to listen on the closepipe.
- changes := make([]unix.Kevent_t, 1)
- // SetKevent converts int to the platform-specific types.
- unix.SetKevent(&changes[0], closepipe[0], unix.EVFILT_READ,
- unix.EV_ADD|unix.EV_ENABLE|unix.EV_ONESHOT)
-
- ok, err := unix.Kevent(kq, changes, nil, nil)
- if ok == -1 {
- unix.Close(kq)
- unix.Close(closepipe[0])
- unix.Close(closepipe[1])
- return kq, closepipe, err
- }
- return kq, closepipe, nil
-}
-
// Register events with the queue.
-func register(kq int, fds []int, flags int, fflags uint32) error {
+func (w *Watcher) register(fds []int, flags int, fflags uint32) error {
changes := make([]unix.Kevent_t, len(fds))
for i, fd := range fds {
// SetKevent converts int to the platform-specific types.
}
// Register the events.
- success, err := unix.Kevent(kq, changes, nil, nil)
+ success, err := unix.Kevent(w.kq, changes, nil, nil)
if success == -1 {
return err
}
}
// read retrieves pending events, or waits until an event occurs.
-// A timeout of nil blocks indefinitely, while 0 polls the queue.
-func read(kq int, events []unix.Kevent_t) ([]unix.Kevent_t, error) {
- n, err := unix.Kevent(kq, nil, events, nil)
+func (w *Watcher) read(events []unix.Kevent_t) ([]unix.Kevent_t, error) {
+ n, err := unix.Kevent(w.kq, nil, events, nil)
if err != nil {
return nil, err
}
return w, nil
}
+func (w *Watcher) sendEvent(name string, mask uint64) bool {
+ if mask == 0 {
+ return false
+ }
+
+ event := w.newEvent(name, uint32(mask))
+ select {
+ case ch := <-w.quit:
+ w.quit <- ch
+ case w.Events <- event:
+ }
+ return true
+}
+
+// Returns true if the error was sent, or false if watcher is closed.
+func (w *Watcher) sendError(err error) bool {
+ select {
+ case w.Errors <- err:
+ return true
+ case <-w.quit:
+ }
+ return false
+}
+
// Close removes all watches and closes the events channel.
func (w *Watcher) Close() error {
w.mu.Lock()
return entries
}
+// These options are from the old golang.org/x/exp/winfsnotify, where you could
+// add various options to the watch. This has long since been removed.
+//
+// The "sys" in the name is misleading as they're not part of any "system".
const (
// Options for AddWatch
sysFSONESHOT = 0x80000000
sysFSQOVERFLOW = 0x4000
)
-func newEvent(name string, mask uint32) Event {
+func (w *Watcher) newEvent(name string, mask uint32) Event {
e := Event{Name: name}
if mask&sysFSCREATE == sysFSCREATE || mask&sysFSMOVEDTO == sysFSMOVEDTO {
e.Op |= Create
return nil
}
-func getDir(pathname string) (dir string, err error) {
+func (w *Watcher) getDir(pathname string) (dir string, err error) {
attr, err := windows.GetFileAttributes(windows.StringToUTF16Ptr(pathname))
if err != nil {
return "", os.NewSyscallError("GetFileAttributes", err)
return
}
-func getIno(path string) (ino *inode, err error) {
+func (w *Watcher) getIno(path string) (ino *inode, err error) {
h, err := windows.CreateFile(windows.StringToUTF16Ptr(path),
windows.FILE_LIST_DIRECTORY,
windows.FILE_SHARE_READ|windows.FILE_SHARE_WRITE|windows.FILE_SHARE_DELETE,
// Must run within the I/O thread.
func (w *Watcher) addWatch(pathname string, flags uint64) error {
- dir, err := getDir(pathname)
+ dir, err := w.getDir(pathname)
if err != nil {
return err
}
return nil
}
- ino, err := getIno(dir)
+ ino, err := w.getIno(dir)
if err != nil {
return err
}
// Must run within the I/O thread.
func (w *Watcher) remWatch(pathname string) error {
- dir, err := getDir(pathname)
+ dir, err := w.getDir(pathname)
if err != nil {
return err
}
- ino, err := getIno(dir)
+ ino, err := w.getIno(dir)
if err != nil {
return err
}
err = windows.CloseHandle(ino.handle)
if err != nil {
- w.Errors <- os.NewSyscallError("CloseHandle", err)
+ w.sendError(os.NewSyscallError("CloseHandle", err))
}
if watch == nil {
return fmt.Errorf("%w: %s", ErrNonExistentWatch, pathname)
func (w *Watcher) startRead(watch *watch) error {
err := windows.CancelIo(watch.ino.handle)
if err != nil {
- w.Errors <- os.NewSyscallError("CancelIo", err)
+ w.sendError(os.NewSyscallError("CancelIo", err))
w.deleteWatch(watch)
}
- mask := toWindowsFlags(watch.mask)
+ mask := w.toWindowsFlags(watch.mask)
for _, m := range watch.names {
- mask |= toWindowsFlags(m)
+ mask |= w.toWindowsFlags(m)
}
if mask == 0 {
err := windows.CloseHandle(watch.ino.handle)
if err != nil {
- w.Errors <- os.NewSyscallError("CloseHandle", err)
+ w.sendError(os.NewSyscallError("CloseHandle", err))
}
w.mu.Lock()
delete(w.watches[watch.ino.volume], watch.ino.index)
switch qErr {
case windows.ERROR_MORE_DATA:
if watch == nil {
- w.Errors <- errors.New("ERROR_MORE_DATA has unexpectedly null lpOverlapped buffer")
+ w.sendError(errors.New("ERROR_MORE_DATA has unexpectedly null lpOverlapped buffer"))
} else {
// The i/o succeeded but the buffer is full.
// In theory we should be building up a full packet.
// CancelIo was called on this handle
continue
default:
- w.Errors <- os.NewSyscallError("GetQueuedCompletionPort", qErr)
+ w.sendError(os.NewSyscallError("GetQueuedCompletionPort", qErr))
continue
case nil:
}
var offset uint32
for {
if n == 0 {
- w.Events <- newEvent("", sysFSQOVERFLOW)
- w.Errors <- errors.New("short read in readEvents()")
+ w.Events <- w.newEvent("", sysFSQOVERFLOW)
+ w.sendError(errors.New("short read in readEvents()"))
break
}
// Point "raw" to the event in the buffer
raw := (*windows.FileNotifyInformation)(unsafe.Pointer(&watch.buf[offset]))
- // TODO: Consider using unsafe.Slice that is available from go1.17
- // https://stackoverflow.com/questions/51187973/how-to-create-an-array-or-a-slice-from-an-array-unsafe-pointer-in-golang
- // instead of using a fixed windows.MAX_PATH buf, we create a buf that is the size of the path name
+
+ // Create a buf that is the size of the path name
size := int(raw.FileNameLength / 2)
var buf []uint16
+ // TODO: Use unsafe.Slice in Go 1.17; https://stackoverflow.com/questions/51187973
sh := (*reflect.SliceHeader)(unsafe.Pointer(&buf))
sh.Data = uintptr(unsafe.Pointer(&raw.FileName))
sh.Len = size
w.sendEvent(fullname, watch.names[name]&sysFSIGNORED)
delete(watch.names, name)
}
- if w.sendEvent(fullname, watch.mask&toFSnotifyFlags(raw.Action)) {
+ if w.sendEvent(fullname, watch.mask&w.toFSnotifyFlags(raw.Action)) {
if watch.mask&sysFSONESHOT != 0 {
watch.mask = 0
}
// Error!
if offset >= n {
- w.Errors <- errors.New("Windows system assumed buffer larger than it is, events have likely been missed.")
+ w.sendError(errors.New(
+ "Windows system assumed buffer larger than it is, events have likely been missed."))
break
}
}
if err := w.startRead(watch); err != nil {
- w.Errors <- err
+ w.sendError(err)
}
}
}
-func (w *Watcher) sendEvent(name string, mask uint64) bool {
- if mask == 0 {
- return false
- }
- event := newEvent(name, uint32(mask))
- select {
- case ch := <-w.quit:
- w.quit <- ch
- case w.Events <- event:
- }
- return true
-}
-
-func toWindowsFlags(mask uint64) uint32 {
+func (w *Watcher) toWindowsFlags(mask uint64) uint32 {
var m uint32
if mask&sysFSACCESS != 0 {
m |= windows.FILE_NOTIFY_CHANGE_LAST_ACCESS
return m
}
-func toFSnotifyFlags(action uint32) uint64 {
+func (w *Watcher) toFSnotifyFlags(action uint32) uint64 {
switch action {
case windows.FILE_ACTION_ADDED:
return sysFSCREATE