"os"
"path/filepath"
"sync"
- "time"
"golang.org/x/sys/unix"
)
type Watcher struct {
Events chan Event
Errors chan error
- done chan struct{} // Channel for sending a "quit message" to the reader goroutine
+ done chan struct{}
- kq int // File descriptor (as returned by the kqueue() syscall).
+ kq int // File descriptor (as returned by the kqueue() syscall).
+ closepipe [2]int // Pipe used for closing.
mu sync.Mutex // Protects access to watcher data
watches map[string]int // Watched file descriptors (key: path).
// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events.
func NewWatcher() (*Watcher, error) {
- kq, err := kqueue()
+ kq, closepipe, err := kqueue()
if err != nil {
return nil, err
}
w := &Watcher{
kq: kq,
+ closepipe: closepipe,
watches: make(map[string]int),
watchesByDir: make(map[string]map[int]struct{}),
dirFlags: make(map[string]uint32),
w.Remove(name)
}
- // send a "quit" message to the reader goroutine
- close(w.done)
+ // Send "quit" message to the reader goroutine.
+ unix.Close(w.closepipe[1])
return nil
}
return fmt.Errorf("%w: %s", ErrNonExistentWatch, name)
}
- const registerRemove = unix.EV_DELETE
- if err := register(w.kq, []int{watchfd}, registerRemove, 0); err != nil {
+ err := register(w.kq, []int{watchfd}, unix.EV_DELETE, 0)
+ if err != nil {
return err
}
// Watch all events (except NOTE_EXTEND, NOTE_LINK, NOTE_REVOKE)
const noteAllEvents = unix.NOTE_DELETE | unix.NOTE_WRITE | unix.NOTE_ATTRIB | unix.NOTE_RENAME
-// keventWaitTime to block on each read from kevent
-var keventWaitTime = durationToTimespec(100 * time.Millisecond)
-
// addWatch adds name to the watched file set.
// The flags are interpreted as described in kevent(2).
// Returns the real path to the file which was added, if any, which may be different from the one passed in the case of symlinks.
isDir = fi.IsDir()
}
- const registerAdd = unix.EV_ADD | unix.EV_CLEAR | unix.EV_ENABLE
- if err := register(w.kq, []int{watchfd}, registerAdd, flags); err != nil {
+ err := register(w.kq, []int{watchfd}, unix.EV_ADD|unix.EV_CLEAR|unix.EV_ENABLE, flags)
+ if err != nil {
unix.Close(watchfd)
return "", err
}
// Event values that it sends down the Events channel.
func (w *Watcher) readEvents() {
eventBuffer := make([]unix.Kevent_t, 10)
-
-loop:
- for {
- // See if there is a message on the "done" channel
- select {
- case <-w.done:
- break loop
- default:
+ defer func() {
+ err := unix.Close(w.kq)
+ if err != nil {
+ w.Errors <- err
}
-
- // Get new events
- kevents, err := read(w.kq, eventBuffer, &keventWaitTime)
+ unix.Close(w.closepipe[0])
+ close(w.done)
+ close(w.Events)
+ close(w.Errors)
+ }()
+
+ for closed := false; !closed; {
+ kevents, err := read(w.kq, eventBuffer)
// EINTR is okay, the syscall was interrupted before timeout expired.
if err != nil && err != unix.EINTR {
select {
case w.Errors <- err:
case <-w.done:
- break loop
+ closed = true
+ continue
}
continue
}
// Flush the events we received to the Events channel
- for len(kevents) > 0 {
+ for _, kevent := range kevents {
var (
- kevent = &kevents[0]
watchfd = int(kevent.Ident)
mask = uint32(kevent.Fflags)
)
+
+ // Shut down the loop when the pipe is closed, but only after all
+ // other events have been processed.
+ if watchfd == w.closepipe[0] {
+ closed = true
+ continue
+ }
+
w.mu.Lock()
path := w.paths[watchfd]
w.mu.Unlock()
select {
case w.Events <- event:
case <-w.done:
- break loop
+ closed = true
+ continue
}
}
}
}
}
-
- // Move to next event
- kevents = kevents[1:]
}
}
-
- // cleanup
- err := unix.Close(w.kq)
- if err != nil {
- // only way the previous loop breaks is if w.done was closed so we need to async send to w.Errors.
- select {
- case w.Errors <- err:
- default:
- }
- }
- close(w.Events)
- close(w.Errors)
}
// newEvent returns an platform-independent Event based on kqueue Fflags.
}
// kqueue creates a new kernel event queue and returns a descriptor.
-func kqueue() (kq int, err error) {
+//
+// This registers a new event on closepipe, which will trigger an event when
+// it's closed. This way we can use kevent() without timeout/polling; without
+// the closepipe, it would block forever and we wouldn't be able to stop it at
+// all.
+func kqueue() (kq int, closepipe [2]int, err error) {
kq, err = unix.Kqueue()
if kq == -1 {
- return kq, err
+ return kq, closepipe, err
+ }
+
+ // Register the close pipe.
+ err = unix.Pipe(closepipe[:])
+ if err != nil {
+ unix.Close(kq)
+ return kq, closepipe, err
+ }
+
+ // Register changes to listen on the closepipe.
+ changes := make([]unix.Kevent_t, 1)
+ // SetKevent converts int to the platform-specific types.
+ unix.SetKevent(&changes[0], closepipe[0], unix.EVFILT_READ,
+ unix.EV_ADD|unix.EV_ENABLE|unix.EV_ONESHOT)
+
+ ok, err := unix.Kevent(kq, changes, nil, nil)
+ if ok == -1 {
+ unix.Close(kq)
+ unix.Close(closepipe[0])
+ unix.Close(closepipe[1])
+ return kq, closepipe, err
}
- return kq, nil
+ return kq, closepipe, nil
}
-// register events with the queue
+// Register events with the queue.
func register(kq int, fds []int, flags int, fflags uint32) error {
changes := make([]unix.Kevent_t, len(fds))
-
for i, fd := range fds {
- // SetKevent converts int to the platform-specific types:
+ // SetKevent converts int to the platform-specific types.
unix.SetKevent(&changes[i], fd, unix.EVFILT_VNODE, flags)
changes[i].Fflags = fflags
}
- // register the events
+ // Register the events.
success, err := unix.Kevent(kq, changes, nil, nil)
if success == -1 {
return err
// read retrieves pending events, or waits until an event occurs.
// A timeout of nil blocks indefinitely, while 0 polls the queue.
-func read(kq int, events []unix.Kevent_t, timeout *unix.Timespec) ([]unix.Kevent_t, error) {
- n, err := unix.Kevent(kq, nil, events, timeout)
+func read(kq int, events []unix.Kevent_t) ([]unix.Kevent_t, error) {
+ n, err := unix.Kevent(kq, nil, events, nil)
if err != nil {
return nil, err
}
return events[0:n], nil
}
-
-// durationToTimespec prepares a timeout value
-func durationToTimespec(d time.Duration) unix.Timespec {
- return unix.NsecToTimespec(d.Nanoseconds())
-}