]> go.fuhry.dev Git - fsnotify.git/commitdiff
add low-level kqueue functions
authorNathan Youngman <git@nathany.com>
Sun, 14 Sep 2014 16:54:43 +0000 (10:54 -0600)
committerNathan Youngman <git@nathany.com>
Wed, 24 Sep 2014 21:34:04 +0000 (15:34 -0600)
to separate out that functionality

kqueue.go

index 5ef1346c0d49bf3d93306135554c71a0236d07d7..acae4c7bce9d901745002bf95ab2a6a08da6e580 100644 (file)
--- a/kqueue.go
+++ b/kqueue.go
@@ -14,6 +14,7 @@ import (
        "path/filepath"
        "sync"
        "syscall"
+       "time"
 )
 
 // Watcher watches a set of files, delivering events to a channel.
@@ -39,12 +40,13 @@ type Watcher struct {
 
 // NewWatcher establishes a new watcher with the underlying OS and begins waiting for events.
 func NewWatcher() (*Watcher, error) {
-       fd, errno := syscall.Kqueue()
-       if fd == -1 {
-               return nil, os.NewSyscallError("kqueue", errno)
+       kq, err := kqueue()
+       if err != nil {
+               return nil, err
        }
+
        w := &Watcher{
-               kq:              fd,
+               kq:              kq,
                watches:         make(map[string]int),
                enFlags:         make(map[string]uint32),
                paths:           make(map[int]string),
@@ -99,16 +101,12 @@ func (w *Watcher) Remove(name string) error {
        if !ok {
                return fmt.Errorf("can't remove non-existent kevent watch for: %s", name)
        }
-       var kbuf [1]syscall.Kevent_t
-       watchEntry := &kbuf[0]
-       syscall.SetKevent(watchEntry, watchfd, syscall.EVFILT_VNODE, syscall.EV_DELETE)
-       entryFlags := watchEntry.Flags
-       success, errno := syscall.Kevent(w.kq, kbuf[:], nil, nil)
-       if success == -1 {
-               return os.NewSyscallError("kevent_rm_watch", errno)
-       } else if (entryFlags & syscall.EV_ERROR) == syscall.EV_ERROR {
-               return errors.New("kevent rm error")
+
+       const registerRemove = syscall.EV_DELETE
+       if err := register(w.kq, []int{watchfd}, registerRemove, 0); err != nil {
+               return err
        }
+
        syscall.Close(watchfd)
        w.wmut.Lock()
        delete(w.watches, name)
@@ -151,11 +149,11 @@ func (w *Watcher) Remove(name string) error {
 const (
        // Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE)
        noteAllEvents = syscall.NOTE_DELETE | syscall.NOTE_WRITE | syscall.NOTE_ATTRIB | syscall.NOTE_RENAME
-
-       // Block for 100 ms on each call to kevent
-       keventWaitTime = 100e6
 )
 
+// keventWaitTime to block on each read from kevent
+var keventWaitTime = durationToTimespec(100 * time.Millisecond)
+
 // addWatch adds path to the watched file set.
 // The flags are interpreted as described in kevent(2).
 func (w *Watcher) addWatch(path string, flags uint32) error {
@@ -231,16 +229,9 @@ func (w *Watcher) addWatch(path string, flags uint32) error {
        w.enFlags[path] = flags
        w.enmut.Unlock()
 
-       var kbuf [1]syscall.Kevent_t
-       watchEntry := &kbuf[0]
-       watchEntry.Fflags = flags
-       syscall.SetKevent(watchEntry, watchfd, syscall.EVFILT_VNODE, syscall.EV_ADD|syscall.EV_CLEAR)
-       entryFlags := watchEntry.Flags
-       success, errno := syscall.Kevent(w.kq, kbuf[:], nil, nil)
-       if success == -1 {
-               return errno
-       } else if (entryFlags & syscall.EV_ERROR) == syscall.EV_ERROR {
-               return errors.New("kevent add error")
+       const registerAdd = syscall.EV_ADD | syscall.EV_CLEAR | syscall.EV_ENABLE
+       if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil {
+               return err
        }
 
        if watchDir {
@@ -255,16 +246,7 @@ func (w *Watcher) addWatch(path string, flags uint32) error {
 // readEvents reads from the kqueue file descriptor, converts the
 // received events into Event objects and sends them via the Events channel
 func (w *Watcher) readEvents() {
-       var (
-               keventbuf [10]syscall.Kevent_t // Event buffer
-               kevents   []syscall.Kevent_t   // Received events
-               twait     *syscall.Timespec    // Time to block waiting for events
-               n         int                  // Number of events returned from kevent
-               errno     error                // Syscall errno
-       )
-       kevents = keventbuf[0:0]
-       twait = new(syscall.Timespec)
-       *twait = syscall.NsecToTimespec(keventWaitTime)
+       eventBuffer := make([]syscall.Kevent_t, 10)
 
        for {
                // See if there is a message on the "done" channel
@@ -286,20 +268,11 @@ func (w *Watcher) readEvents() {
                }
 
                // Get new events
-               if len(kevents) == 0 {
-                       n, errno = syscall.Kevent(w.kq, nil, keventbuf[:], twait)
-
-                       // EINTR is okay, basically the syscall was interrupted before
-                       // timeout expired.
-                       if errno != nil && errno != syscall.EINTR {
-                               w.Errors <- os.NewSyscallError("kevent", errno)
-                               continue
-                       }
-
-                       // Received some events
-                       if n > 0 {
-                               kevents = keventbuf[0:n]
-                       }
+               kevents, err := read(w.kq, eventBuffer, &keventWaitTime)
+               // EINTR is okay, the syscall was interrupted before timeout expired.
+               if err != nil && err != syscall.EINTR {
+                       w.Errors <- os.NewSyscallError("Kevent", err)
+                       continue
                }
 
                // Flush the events we received to the Events channel
@@ -477,3 +450,45 @@ func (w *Watcher) sendDirectoryChangeEvents(dirPath string) {
                w.femut.Unlock()
        }
 }
+
+// kqueue creates a new kernel event queue and returns a descriptor.
+func kqueue() (kq int, err error) {
+       kq, err = syscall.Kqueue()
+       if kq == -1 {
+               return kq, os.NewSyscallError("Kqueue", err)
+       }
+       return kq, nil
+}
+
+// register events with the queue
+func register(kq int, fds []int, flags int, fflags uint32) error {
+       changes := make([]syscall.Kevent_t, len(fds))
+
+       for i, fd := range fds {
+               // SetKevent converts int to the platform-specific types:
+               syscall.SetKevent(&changes[i], fd, syscall.EVFILT_VNODE, flags)
+               changes[i].Fflags = fflags
+       }
+
+       // register the events
+       success, err := syscall.Kevent(kq, changes, nil, nil)
+       if success == -1 {
+               return os.NewSyscallError("Kevent", err)
+       }
+       return nil
+}
+
+// read retrieves pending events
+// A timeout of nil blocks indefinitely, while 0 polls the queue.
+func read(kq int, events []syscall.Kevent_t, timeout *syscall.Timespec) ([]syscall.Kevent_t, error) {
+       n, err := syscall.Kevent(kq, nil, events, timeout)
+       if err != nil {
+               return nil, err
+       }
+       return events[0:n], nil
+}
+
+// durationToTimespec prepares a timeout value
+func durationToTimespec(d time.Duration) syscall.Timespec {
+       return syscall.NsecToTimespec(d.Nanoseconds())
+}