From: Nathan Youngman Date: Sun, 14 Sep 2014 16:54:43 +0000 (-0600) Subject: add low-level kqueue functions X-Git-Tag: v1.7.2~263 X-Git-Url: https://go.fuhry.dev/?a=commitdiff_plain;h=1f65e2ef12aa538ba66b432a0c60279f4406add1;p=fsnotify.git add low-level kqueue functions to separate out that functionality --- diff --git a/kqueue.go b/kqueue.go index 5ef1346..acae4c7 100644 --- a/kqueue.go +++ b/kqueue.go @@ -14,6 +14,7 @@ import ( "path/filepath" "sync" "syscall" + "time" ) // Watcher watches a set of files, delivering events to a channel. @@ -39,12 +40,13 @@ type Watcher struct { // NewWatcher establishes a new watcher with the underlying OS and begins waiting for events. func NewWatcher() (*Watcher, error) { - fd, errno := syscall.Kqueue() - if fd == -1 { - return nil, os.NewSyscallError("kqueue", errno) + kq, err := kqueue() + if err != nil { + return nil, err } + w := &Watcher{ - kq: fd, + kq: kq, watches: make(map[string]int), enFlags: make(map[string]uint32), paths: make(map[int]string), @@ -99,16 +101,12 @@ func (w *Watcher) Remove(name string) error { if !ok { return fmt.Errorf("can't remove non-existent kevent watch for: %s", name) } - var kbuf [1]syscall.Kevent_t - watchEntry := &kbuf[0] - syscall.SetKevent(watchEntry, watchfd, syscall.EVFILT_VNODE, syscall.EV_DELETE) - entryFlags := watchEntry.Flags - success, errno := syscall.Kevent(w.kq, kbuf[:], nil, nil) - if success == -1 { - return os.NewSyscallError("kevent_rm_watch", errno) - } else if (entryFlags & syscall.EV_ERROR) == syscall.EV_ERROR { - return errors.New("kevent rm error") + + const registerRemove = syscall.EV_DELETE + if err := register(w.kq, []int{watchfd}, registerRemove, 0); err != nil { + return err } + syscall.Close(watchfd) w.wmut.Lock() delete(w.watches, name) @@ -151,11 +149,11 @@ func (w *Watcher) Remove(name string) error { const ( // Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE) noteAllEvents = syscall.NOTE_DELETE | syscall.NOTE_WRITE | syscall.NOTE_ATTRIB | syscall.NOTE_RENAME - - // Block for 100 ms on each call to kevent - keventWaitTime = 100e6 ) +// keventWaitTime to block on each read from kevent +var keventWaitTime = durationToTimespec(100 * time.Millisecond) + // addWatch adds path to the watched file set. // The flags are interpreted as described in kevent(2). func (w *Watcher) addWatch(path string, flags uint32) error { @@ -231,16 +229,9 @@ func (w *Watcher) addWatch(path string, flags uint32) error { w.enFlags[path] = flags w.enmut.Unlock() - var kbuf [1]syscall.Kevent_t - watchEntry := &kbuf[0] - watchEntry.Fflags = flags - syscall.SetKevent(watchEntry, watchfd, syscall.EVFILT_VNODE, syscall.EV_ADD|syscall.EV_CLEAR) - entryFlags := watchEntry.Flags - success, errno := syscall.Kevent(w.kq, kbuf[:], nil, nil) - if success == -1 { - return errno - } else if (entryFlags & syscall.EV_ERROR) == syscall.EV_ERROR { - return errors.New("kevent add error") + const registerAdd = syscall.EV_ADD | syscall.EV_CLEAR | syscall.EV_ENABLE + if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil { + return err } if watchDir { @@ -255,16 +246,7 @@ func (w *Watcher) addWatch(path string, flags uint32) error { // readEvents reads from the kqueue file descriptor, converts the // received events into Event objects and sends them via the Events channel func (w *Watcher) readEvents() { - var ( - keventbuf [10]syscall.Kevent_t // Event buffer - kevents []syscall.Kevent_t // Received events - twait *syscall.Timespec // Time to block waiting for events - n int // Number of events returned from kevent - errno error // Syscall errno - ) - kevents = keventbuf[0:0] - twait = new(syscall.Timespec) - *twait = syscall.NsecToTimespec(keventWaitTime) + eventBuffer := make([]syscall.Kevent_t, 10) for { // See if there is a message on the "done" channel @@ -286,20 +268,11 @@ func (w *Watcher) readEvents() { } // Get new events - if len(kevents) == 0 { - n, errno = syscall.Kevent(w.kq, nil, keventbuf[:], twait) - - // EINTR is okay, basically the syscall was interrupted before - // timeout expired. - if errno != nil && errno != syscall.EINTR { - w.Errors <- os.NewSyscallError("kevent", errno) - continue - } - - // Received some events - if n > 0 { - kevents = keventbuf[0:n] - } + kevents, err := read(w.kq, eventBuffer, &keventWaitTime) + // EINTR is okay, the syscall was interrupted before timeout expired. + if err != nil && err != syscall.EINTR { + w.Errors <- os.NewSyscallError("Kevent", err) + continue } // Flush the events we received to the Events channel @@ -477,3 +450,45 @@ func (w *Watcher) sendDirectoryChangeEvents(dirPath string) { w.femut.Unlock() } } + +// kqueue creates a new kernel event queue and returns a descriptor. +func kqueue() (kq int, err error) { + kq, err = syscall.Kqueue() + if kq == -1 { + return kq, os.NewSyscallError("Kqueue", err) + } + return kq, nil +} + +// register events with the queue +func register(kq int, fds []int, flags int, fflags uint32) error { + changes := make([]syscall.Kevent_t, len(fds)) + + for i, fd := range fds { + // SetKevent converts int to the platform-specific types: + syscall.SetKevent(&changes[i], fd, syscall.EVFILT_VNODE, flags) + changes[i].Fflags = fflags + } + + // register the events + success, err := syscall.Kevent(kq, changes, nil, nil) + if success == -1 { + return os.NewSyscallError("Kevent", err) + } + return nil +} + +// read retrieves pending events +// A timeout of nil blocks indefinitely, while 0 polls the queue. +func read(kq int, events []syscall.Kevent_t, timeout *syscall.Timespec) ([]syscall.Kevent_t, error) { + n, err := syscall.Kevent(kq, nil, events, timeout) + if err != nil { + return nil, err + } + return events[0:n], nil +} + +// durationToTimespec prepares a timeout value +func durationToTimespec(d time.Duration) syscall.Timespec { + return syscall.NsecToTimespec(d.Nanoseconds()) +}