type Watcher struct {
Events chan Event
Errors chan error
+ done chan bool // Channel for sending a "quit message" to the reader goroutine
- kq int // File descriptor (as returned by the kqueue() syscall).
+ kq int // File descriptor (as returned by the kqueue() syscall).
+
+ mu sync.Mutex // Protects access to watcher data
watches map[string]int // Map of watched file descriptors (key: path).
externalWatches map[string]bool // Map of watches added by user of the library.
dirFlags map[string]uint32 // Map of watched directories to fflags used in kqueue.
paths map[int]pathInfo // Map file descriptors to path names for processing kqueue events.
fileExists map[string]bool // Keep track of if we know this file exists (to stop duplicate create events).
- done chan bool // Channel for sending a "quit message" to the reader goroutine
isClosed bool // Set to true when Close() is first called
-
- mu sync.Mutex // Mutex for the Watcher itself (isClosed).
-
- wmut sync.Mutex // Protects access to watches.
- pmut sync.Mutex // Protects access to paths.
- ewmut sync.Mutex // Protects access to externalWatches.
-
- dirmut sync.Mutex // Protects access to dirFlags.
- femut sync.Mutex // Protects access to fileExists.
}
type pathInfo struct {
// Send "quit" message to the reader goroutine:
w.done <- true
- w.wmut.Lock()
+
+ w.mu.Lock()
ws := w.watches
- w.wmut.Unlock()
+ w.mu.Unlock()
for name := range ws {
w.Remove(name)
}
// Add starts watching the named file or directory (non-recursively).
func (w *Watcher) Add(name string) error {
- w.ewmut.Lock()
+ w.mu.Lock()
w.externalWatches[name] = true
- w.ewmut.Unlock()
+ w.mu.Unlock()
return w.addWatch(name, noteAllEvents)
}
// Remove stops watching the the named file or directory (non-recursively).
func (w *Watcher) Remove(name string) error {
name = filepath.Clean(name)
- w.wmut.Lock()
+ w.mu.Lock()
watchfd, ok := w.watches[name]
- w.wmut.Unlock()
+ w.mu.Unlock()
if !ok {
return fmt.Errorf("can't remove non-existent kevent watch for: %s", name)
}
syscall.Close(watchfd)
- w.wmut.Lock()
- delete(w.watches, name)
- w.wmut.Unlock()
- w.dirmut.Lock()
- delete(w.dirFlags, name)
- w.dirmut.Unlock()
- w.pmut.Lock()
+ w.mu.Lock()
isDir := w.paths[watchfd].isDir
+ delete(w.watches, name)
delete(w.paths, watchfd)
- w.pmut.Unlock()
+ delete(w.dirFlags, name)
+ w.mu.Unlock()
// Find all watched paths that are in this directory that are not external.
if isDir {
var pathsToRemove []string
- w.pmut.Lock()
+ w.mu.Lock()
for _, path := range w.paths {
wdir, _ := filepath.Split(path.name)
- if filepath.Clean(wdir) == filepath.Clean(name) {
- w.ewmut.Lock()
+ if filepath.Clean(wdir) == name {
if !w.externalWatches[path.name] {
pathsToRemove = append(pathsToRemove, path.name)
}
- w.ewmut.Unlock()
}
}
- w.pmut.Unlock()
+ w.mu.Unlock()
for _, name := range pathsToRemove {
// Since these are internal, not much sense in propagating error
// to the user, as that will just confuse them with an error about
// addWatch adds name to the watched file set.
// The flags are interpreted as described in kevent(2).
func (w *Watcher) addWatch(name string, flags uint32) error {
+ var isDir bool
+ // Make ./name and name equivalent
+ name = filepath.Clean(name)
+
w.mu.Lock()
if w.isClosed {
w.mu.Unlock()
return errors.New("kevent instance already closed")
}
- w.mu.Unlock()
-
- // Make ./name and name equivalent
- name = filepath.Clean(name)
-
- w.wmut.Lock()
watchfd, alreadyWatching := w.watches[name]
- w.wmut.Unlock()
-
- var isDir bool
-
+ // We already have a watch, but we can still override flags.
if alreadyWatching {
- // We already have a watch, but we can still override flags
- w.pmut.Lock()
isDir = w.paths[watchfd].isDir
- w.pmut.Unlock()
- } else {
+ }
+ w.mu.Unlock()
+
+ if !alreadyWatching {
fi, err := os.Lstat(name)
if err != nil {
return err
}
- // don't watch socket
+ // Don't watch sockets.
if fi.Mode()&os.ModeSocket == os.ModeSocket {
return nil
}
}
if !alreadyWatching {
- w.wmut.Lock()
+ w.mu.Lock()
w.watches[name] = watchfd
- w.wmut.Unlock()
-
- w.pmut.Lock()
w.paths[watchfd] = pathInfo{name: name, isDir: isDir}
- w.pmut.Unlock()
+ w.mu.Unlock()
}
if isDir {
// Watch the directory if it has not been watched before,
// or if it was watched before, but perhaps only a NOTE_DELETE (watchDirectoryFiles)
- w.dirmut.Lock()
+ w.mu.Lock()
watchDir := (flags&syscall.NOTE_WRITE) == syscall.NOTE_WRITE &&
(!alreadyWatching || (w.dirFlags[name]&syscall.NOTE_WRITE) != syscall.NOTE_WRITE)
// Store flags so this watch can be updated later
w.dirFlags[name] = flags
- w.dirmut.Unlock()
+ w.mu.Unlock()
if watchDir {
if err := w.watchDirectoryFiles(name); err != nil {
return nil
}
-// readEvents reads from the kqueue file descriptor, converts the
-// received events into Event objects and sends them via the Events channel
+// readEvents reads from kqueue and converts the received kevents into
+// Event values that it sends down the Events channel.
func (w *Watcher) readEvents() {
eventBuffer := make([]syscall.Kevent_t, 10)
// See if there is a message on the "done" channel
select {
case <-w.done:
- errno := syscall.Close(w.kq)
- if errno != nil {
- w.Errors <- os.NewSyscallError("close", errno)
+ err := syscall.Close(w.kq)
+ if err != nil {
+ w.Errors <- os.NewSyscallError("close", err)
}
close(w.Events)
close(w.Errors)
// Flush the events we received to the Events channel
for len(kevents) > 0 {
- watchEvent := &kevents[0]
- watchfd := int(watchEvent.Ident)
- mask := uint32(watchEvent.Fflags)
-
- w.pmut.Lock()
+ kevent := &kevents[0]
+ watchfd := int(kevent.Ident)
+ mask := uint32(kevent.Fflags)
+ w.mu.Lock()
path := w.paths[watchfd]
- w.pmut.Unlock()
-
+ w.mu.Unlock()
event := newEvent(path.name, mask)
if path.isDir && !(event.Op&Remove == Remove) {
- // Double check to make sure the directory exist. This can happen when
+ // Double check to make sure the directory exists. This can happen when
// we do a rm -fr on a recursively watched folders and we receive a
// modification event first but the folder has been deleted and later
// receive the delete event
w.Events <- event
}
- // Move to next event
- kevents = kevents[1:]
-
- if event.Op&Rename == Rename {
+ if event.Op&Rename == Rename || event.Op&Remove == Remove {
w.Remove(event.Name)
- w.femut.Lock()
+ w.mu.Lock()
delete(w.fileExists, event.Name)
- w.femut.Unlock()
+ w.mu.Unlock()
}
if event.Op&Remove == Remove {
- w.Remove(event.Name)
- w.femut.Lock()
- delete(w.fileExists, event.Name)
- w.femut.Unlock()
-
// Look for a file that may have overwritten this.
// For example, mv f1 f2 will delete f2, then create f2.
fileDir, _ := filepath.Split(event.Name)
fileDir = filepath.Clean(fileDir)
- w.wmut.Lock()
+ w.mu.Lock()
_, found := w.watches[fileDir]
- w.wmut.Unlock()
+ w.mu.Unlock()
if found {
// make sure the directory exists before we watch for changes. When we
// do a recursive watch and perform rm -fr, the parent directory might
}
}
}
+
+ // Move to next event
+ kevents = kevents[1:]
}
}
}
return err
}
- w.femut.Lock()
+ w.mu.Lock()
w.fileExists[filePath] = true
- w.femut.Unlock()
+ w.mu.Unlock()
}
return nil
// Search for new files
for _, fileInfo := range files {
filePath := filepath.Join(dirPath, fileInfo.Name())
- w.femut.Lock()
+ w.mu.Lock()
_, doesExist := w.fileExists[filePath]
- w.femut.Unlock()
+ w.mu.Unlock()
if !doesExist {
// Send create event
w.Events <- newCreateEvent(filePath)
return
}
- w.femut.Lock()
+ w.mu.Lock()
w.fileExists[filePath] = true
- w.femut.Unlock()
+ w.mu.Unlock()
}
}
if fileInfo.IsDir() {
// mimic Linux providing delete events for subdirectories
// but preserve the flags used if currently watching subdirectory
- w.dirmut.Lock()
+ w.mu.Lock()
flags := w.dirFlags[name]
- w.dirmut.Unlock()
+ w.mu.Unlock()
flags |= syscall.NOTE_DELETE
return w.addWatch(name, flags)
return nil
}
-// read retrieves pending events
+// read retrieves pending events, or waits until an event occurs.
// A timeout of nil blocks indefinitely, while 0 polls the queue.
func read(kq int, events []syscall.Kevent_t, timeout *syscall.Timespec) ([]syscall.Kevent_t, error) {
n, err := syscall.Kevent(kq, nil, events, timeout)