]> go.fuhry.dev Git - runtime.git/commitdiff
fsnotify: robust support for detecting changes to k8s config objects
authorDan Fuhry <dan@fuhry.com>
Thu, 15 May 2025 21:35:08 +0000 (17:35 -0400)
committerDan Fuhry <dan@fuhry.com>
Thu, 15 May 2025 21:42:01 +0000 (17:42 -0400)
Kubernetes uses a fairly crazy scheme for files mounted from config maps and in-memory CSI drivers:

  /mnt/file     -> ..data/file
  /mnt/..data   -> ..TIME
  /mnt/..TIME/file (real file)

New versions of the config state are published atomically by doing

  symlink(/mnt/..NEWTIME, /mnt/..data_new)
  rename(/mnt/..data_new, /mnt/..data)

To detect changes to the contents of /mnt/file using inotify instead of polling file contents, we need to detect when /mnt/..data is overwritten.

This requires complex symlink detection and resolution for not only the file but any of the directories in its path, and tracking of final absolute path and inode number to detect when the file was swapped with a different one.

This commit adds that support to fsnotify, along with lots of documentation and tests.

22 files changed:
mtls/fsnotify/all_test.go [new file with mode: 0644]
mtls/fsnotify/fsnotify.go
mtls/fsnotify/fsnotify_test.go [new file with mode: 0644]
mtls/fsnotify/resolve_symlink_tests/01/one [new file with mode: 0644]
mtls/fsnotify/resolve_symlink_tests/02/one [new file with mode: 0644]
mtls/fsnotify/resolve_symlink_tests/02/two [new symlink]
mtls/fsnotify/resolve_symlink_tests/03/one [new file with mode: 0644]
mtls/fsnotify/resolve_symlink_tests/03/three [new symlink]
mtls/fsnotify/resolve_symlink_tests/03/two [new symlink]
mtls/fsnotify/resolve_symlink_tests/04/one [new symlink]
mtls/fsnotify/resolve_symlink_tests/04/two [new symlink]
mtls/fsnotify/resolve_symlink_tests/05/one [new symlink]
mtls/fsnotify/resolve_symlink_tests/05/three [new symlink]
mtls/fsnotify/resolve_symlink_tests/05/two [new symlink]
mtls/fsnotify/resolve_symlink_tests/06/one [new symlink]
mtls/fsnotify/resolve_symlink_tests/07/one [new file with mode: 0644]
mtls/fsnotify/resolve_symlink_tests/07/sub/two [new symlink]
mtls/fsnotify/resolve_symlink_tests/08/one [new symlink]
mtls/fsnotify/resolve_symlink_tests/08/sub/two [new file with mode: 0644]
mtls/fsnotify/util.go [new file with mode: 0644]
mtls/fsnotify/util_test.go [new file with mode: 0644]
utils/checkers/contains_key.go [new file with mode: 0644]

diff --git a/mtls/fsnotify/all_test.go b/mtls/fsnotify/all_test.go
new file mode 100644 (file)
index 0000000..1650107
--- /dev/null
@@ -0,0 +1,35 @@
+package fsnotify
+
+import (
+       "os"
+       "testing"
+
+       . "gopkg.in/check.v1"
+
+       "go.fuhry.dev/runtime/utils/hashset"
+)
+
+type FsnotifySuite struct {
+       tempFiles *hashset.HashSet[string]
+}
+
+func SetUpSuite(s *FsnotifySuite) {
+       s.tempFiles = hashset.NewHashSet[string]()
+}
+
+func TearDownSuite(s *FsnotifySuite) {
+       Debug()
+       for _, f := range s.tempFiles.AsSlice() {
+               os.Remove(f)
+       }
+}
+
+var s = &FsnotifySuite{}
+var _ = Suite(s)
+
+func Test(t *testing.T) {
+       SetUpSuite(s)
+       defer TearDownSuite(s)
+
+       TestingT(t)
+}
index 9fd4d14150522b956a34bdcd47a122d7d2aefa04..3cc71b0e339eef38dae4e3950249b23fcdb6462d 100644 (file)
@@ -1,7 +1,31 @@
 package fsnotify
 
+// Package fsnotify is a high level wrapper around fsnotify which has two goals:
+//
+//   - De-noise the stream of events so that a watcher is only notified for events that actually
+//     matter
+//   - Detect when the file is swapped out for another, either because it was unlinked and
+//     re-created or because a symlink changed in its path.
+//
+// We achieve this by watching the following in addition to the original path:
+//
+//   - The original path's parent directory
+//   - If the orignal path is a symlink: the link target
+//   - If there are multiple levels of symlinks, process recursively
+//   - For the original path and the complete chain of link targets, if any path component is a
+//     symlink: that path component and its parent directory
+//
+// If multiple files in a single directory are watched, ref counting is used to ensure the parent
+// directory is unwatched only after all watched files in it are removed or unsubscribed.
+//
+// All operations except Write and Close are immediately propagated to the handler.
+
 import (
-       "path/filepath"
+       "errors"
+       "fmt"
+       "os"
+       "path"
+       "strings"
        "sync"
 
        "go.fuhry.dev/fsnotify"
@@ -10,6 +34,19 @@ import (
 )
 
 type NotifyFunc = func(string, fsnotify.Op)
+type notifyRule struct {
+       // absPath is the real path of the file after resolving all symlinks both in the filename
+       // itself, and in any of the directories in the path components of the file's absolute path.
+       absPath string
+       // inum is the inode number of the underlying file after resolving
+       // all symlinks.
+       inum int64
+       // upstreams is a HashSet of the original filename, target filename, and all intermediates
+       // in between
+       upstreams *hashset.HashSet[string]
+       callbacks []NotifyFunc
+}
+
 type Op = fsnotify.Op
 
 var (
@@ -24,9 +61,23 @@ var (
 var startWatcherMu sync.Mutex
 var gWatcher *fsnotify.Watcher
 var logger log.Logger
+
+// Map of original path => notifyRule.
+// This tracks paths we were asked to watch. More paths may be watched than the entries here, if
+// this library determines we need to watch other paths to be notified of changes to these files.
+var watchHandlers map[string]*notifyRule
+
+// Reference counts of paths registered in the global fsnotify instance. When an entry reaches zero,
+// we remove the path from the global inotify instance.
+var watched map[string]uint
+
+// Set of watched objects which have seen write events.
+// We wait until we see a CLOSE event to notify the handler.
 var pendingWrites *hashset.HashSet[string]
-var watched *hashset.HashSet[string]
-var watchHandlers map[string][]NotifyFunc
+
+// map of {watched symlink path => rule name}, used to keep track of which symlinks we added
+// watchers on in order to detect changes to watched files
+var symlinkPropagationMap map[string]*hashset.HashSet[string]
 
 func startWatcher() error {
        startWatcherMu.Lock()
@@ -36,9 +87,8 @@ func startWatcher() error {
        }
 
        logger = log.Default().WithPrefix("mtls/fsnotify")
-       watchHandlers = make(map[string][]NotifyFunc, 0)
-       pendingWrites = hashset.NewHashSet[string]()
-       watched = hashset.NewHashSet[string]()
+       initState()
+       // inodes = make(map[string]int64)
        watcher, err := fsnotify.NewWatcher()
        if err != nil {
                log.Panicf("failed to start new global watcher")
@@ -49,24 +99,83 @@ func startWatcher() error {
        return nil
 }
 
-func addWatch(path string) error {
+func initState() {
+       watchHandlers = make(map[string]*notifyRule, 0)
+       watched = make(map[string]uint)
+       pendingWrites = hashset.NewHashSet[string]()
+       symlinkPropagationMap = make(map[string]*hashset.HashSet[string])
+}
+
+func subscribeInternal(filePath string) error {
        startWatcherMu.Lock()
        defer startWatcherMu.Unlock()
 
-       if watched.Contains(path) {
+       if n, ok := watched[filePath]; ok {
+               logger.V(2).Debugf("subscribe(%s): refCount=%d", filePath, n+1)
+               watched[filePath]++
                return nil
        }
 
-       err := gWatcher.Add(path)
+       err := gWatcher.Add(filePath)
        if err != nil {
+               logger.V(2).Errorf("subscribe(%s): ERR=%v", filePath, err)
+               return err
+       }
+       logger.V(2).Debugf("subscribe(%s): refCount=1", filePath)
+       watched[filePath] = 1
+       return nil
+}
+
+func subscribe(filePath string) error {
+       if err := subscribeInternal(filePath); err != nil {
                return err
        }
-       watched.Add(path)
+       if stat, err := os.Lstat(filePath); err == nil && !stat.IsDir() {
+               return subscribeInternal(path.Dir(filePath))
+       }
+       return nil
+}
+
+func unsubscribeInternal(filePath string) error {
+       startWatcherMu.Lock()
+       defer startWatcherMu.Unlock()
+
+       if _, ok := watched[filePath]; !ok {
+               logger.V(2).Debugf("unsubscribe(%s): refCount=!NF", filePath)
+               return nil
+       }
+
+       watched[filePath]--
+       logger.V(2).Debugf("unsubscribe(%s): refCount=%d", filePath, watched[filePath])
+       if watched[filePath] > 0 {
+               return nil
+       }
+       delete(watched, filePath)
+
+       err := gWatcher.Remove(filePath)
+       if err != nil {
+               if errors.Is(err, fsnotify.ErrNonExistentWatch) {
+                       logger.V(2).Debugf("unsubscribe(%s): refCount=!0", filePath)
+                       return nil
+               }
+               logger.V(2).Errorf("unsubscribe(%s): failed to remove a watch: %T(%+v)", filePath, err, err)
+               // log error but don't block dir refcount from being decremented
+               return nil
+       }
+
        return nil
 }
 
+func unsubscribe(filePath string) error {
+       err := unsubscribeInternal(filePath)
+       unsubscribeInternal(path.Dir(filePath))
+       return err
+}
+
 func watcherLoop() {
-       defer gWatcher.Close()
+       defer (func() {
+               gWatcher = nil
+       })()
 
        for {
                select {
@@ -76,6 +185,9 @@ func watcherLoop() {
                        }
                        handleEvent(event)
                case err := <-gWatcher.Errors:
+                       if err == fsnotify.ErrClosed {
+                               return
+                       }
                        if err != nil {
                                logger.Error(err)
                        }
@@ -83,60 +195,317 @@ func watcherLoop() {
        }
 }
 
+// handleEvent processes a single event directly from inotify.
+//
+// The filename on an event can be any number of things:
+//   - the original filename
+//   - the parent directory of the original file
+//   - if the original file is a symlink, it can be anything in the chain between the original
+//     filename and the final resolved path
+//   - in all cases, if any component of the original file's absolute path is a symlink, it can be
+//     that symlink
+//
+// If the fsnotify event corresponds to a symlink that is part of the parent tree for a watched path
+// or one of the links between the original watched path and the final resolved path (inclusive),
+// we call handleEventForPath with the original fsnotify event (containing the path from the event)
+// but ultimately want to trigger the handler for the original path.
 func handleEvent(event fsnotify.Event) {
-       if handlers, ok := watchHandlers[event.Name]; ok {
+       if event.Has(Remove) {
+               // Watchers are automatically removed when a watched file is deleted. Keep local state in
+               // sync.
+               startWatcherMu.Lock()
+               delete(watched, event.Name)
+               startWatcherMu.Unlock()
+       }
+       if _, ok := watchHandlers[event.Name]; ok {
+               // Event occurred on the original path.
+               handleEventForPath(event, event.Name)
+               return
+       } else if children, ok := symlinkPropagationMap[event.Name]; ok {
+               // Event occurred on the underlying file (recursively-resolved absolute path) of a watched
+               // path
+               for _, child := range children.AsSortedSlice() {
+                       logger.V(2).Debugf("propagating %v event on %s to underlying path %s",
+                               event.Op, event.Name, child)
+                       handleEventForPath(event, child)
+               }
+               return
+       } else if links := symlinkPropagationChildren(event.Name); len(links) > 0 {
+               // Event occurred on a symlink in the directory chain of the watched path or a link target
+               // between the original file and the underlying absolute path. Practical example:
+               //
+               // /config/config.yml -> ..data/config.yml             (<-- original watched path)
+               // /config/..data -> ..2006_01_02_15_04_05.9999999999  (<-- this symlink changes)
+               // /config/..2006_01_02_15_04_05.9999999999/config.yml (<-- underlying file)
+               //
+               // State looks like:
+               //    watched: {
+               //      /config: 2 references (config.yml, ..data)
+               //      /config/config.yml: 1 reference
+               //      /config/..data: 2 references (..data/config.yml, ..data)
+               //      /config/..2006_01_02_15_04_05.9999999999: 1 reference
+               //      /config/..2006_01_02_15_04_05.9999999999/config.yml: 1 reference
+               //    }
+               // watchHandlers:
+               //    {"/config/config.yml": {inum: 123,
+               //                           absPath: "/config/..2006_01_02_15_04_05.9999999999/config.yml",
+               //                           upstreams: {"/config/..data/config.yml",
+               //                                    "/config/..2006_01_02_15_04_05.9999999999/config.yml"}
+               // symlinkPropagationMap: {
+               //     "/config/..data": {"/config/config.yml"},
+               //     "/config/..2006_01_02_15_04_05.9999999999/config.yml": {"/config/config.yml"},
+               // }
+               // In these cases we pick up the event on `/config/..data` and fire events for any original
+               // paths
+               for _, link := range links {
+                       for _, ruleName := range symlinkPropagationMap[link].AsSlice() {
+                               if _, ok := watchHandlers[ruleName]; ok {
+                                       logger.V(2).Debugf("propagating %v event on %s to origin %s",
+                                               event.Op, link, ruleName)
+                                       handleEventForPath(event, ruleName)
+                               } else {
+                                       logger.V(2).Errorf("symlinkPropagationMap target without corresponding rule: %s", ruleName)
+                               }
+                       }
+               }
+               Debug()
+               return
+       }
+       // Don't issue this warning if we received an event for something being created in a directory
+       // we're watching because of files under it
+       if _, ok := watched[path.Dir(event.Name)]; ok {
+               if event.Has(fsnotify.Create) || event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) {
+                       return
+               }
+       }
+       logger.V(1).Warningf("dangling watcher on path %s: not known within normal rules or symlink map: %v", event.Name, event.Op)
+}
+
+// handleEventForPath is the second stage of event handling which calls the actual handlers.
+// ruleName corresponds to an entry in watchHandlers.
+//
+// A single event may result in multiple handlers needing to be called, if a common parent of
+// multiple watched files is a symlink that gets repointed. For these cases, handleEventForPath is
+// called multiple times from a single invocation of handleEvent.
+func handleEventForPath(event fsnotify.Event, ruleName string) {
+       rule, ok := watchHandlers[ruleName]
+       if !ok {
+               logger.V(1).Errorf("handleEventForPath called for file without rule: %s: %+v", ruleName, event)
+               return
+       }
+
+       logger.V(2).Debugf("inotify event on %s: %v", event.Name, event.Op)
+       logger.V(2).Debugf("ruleName: %s", ruleName)
+       logger.V(2).Debugf("upstreams: %+v", rule.upstreams.AsSlice())
+
+       newRealpath := resolveSymlinkAndDereferenceParents(ruleName)
+       newInum := inum(newRealpath)
+       fileSwapped := rule.absPath != newRealpath || rule.inum != newInum
+       handlerPath := event.Name
+
+       if fileSwapped {
+               logger.V(1).Noticef(
+                       "watched file %s: detected absolute path or inum change, pruning watchers:\n  %s->%s\n  %d->%d",
+                       ruleName,
+                       rule.absPath, newRealpath,
+                       rule.inum, newInum,
+               )
+
+               handlerPath = ruleName
 
-               if event.Has(Create) {
-                       addWatch(event.Name)
-               } else if event.Has(Remove) {
-                       // remove watchers from deleted files.
-                       // we will still be notified if the file is re-created, via the watch
-                       // on the file's parent directory
-                       gWatcher.Remove(event.Name)
+               for _, upstream := range rule.upstreams.AsSortedSlice() {
+                       unsubscribe(upstream)
                }
+               untrackSymlinks(ruleName)
+               unsubscribe(ruleName)
+               if len(watchHandlers) == 1 {
+                       logger.V(2).Noticef("all watchers should now be cleaned up:")
+                       Debug()
+               }
+               subscribe(ruleName)
 
-               if event.Op == Write {
-                       pendingWrites.Add(event.Name)
-                       return
-               } else if event.Op == Close {
-                       if !pendingWrites.Contains(event.Name) {
-                               return
-                       }
+               scanSymlinks(ruleName)
+       }
+
+       if event.Has(Create) {
+               if _, ok := watched[event.Name]; !ok {
+                       subscribe(event.Name)
+               }
+       }
+
+       if event.Has(Write) {
+               pendingWrites.Add(event.Name)
+               return
+       }
 
+       if event.Has(Close) {
+               if pendingWrites.Contains(event.Name) {
                        pendingWrites.Del(event.Name)
+               } else if !fileSwapped {
+                       return
+               }
+       }
+
+       for _, handler := range rule.callbacks {
+               handler(handlerPath, event.Op)
+       }
+}
+
+func trackSymlink(realPath, ruleName string) {
+       if realPath == ruleName {
+               return
+       }
+       if _, ok := symlinkPropagationMap[realPath]; !ok {
+               symlinkPropagationMap[realPath] = hashset.NewHashSet[string]()
+       }
+       symlinkPropagationMap[realPath].Add(ruleName)
+}
+
+func untrackSymlinks(ruleName string) {
+       var toDelete []string
+       for link, targets := range symlinkPropagationMap {
+               if targets.Contains(ruleName) {
+                       logger.V(2).Debugf("cleaning up watcher on %s for fsnotify watch rule %s", link, ruleName)
+                       unsubscribe(link)
+                       targets.Del(ruleName)
+               }
+               if targets.Len() == 0 {
+                       toDelete = append(toDelete, link)
                }
+       }
+
+       for _, d := range toDelete {
+               delete(symlinkPropagationMap, d)
+       }
+}
+
+func symlinkPropagationChildren(link string) []string {
+       var targets []string
+       for path, _ := range symlinkPropagationMap {
+               if path == link || strings.HasPrefix(path, link+"/") {
+                       targets = append(targets, path)
+               }
+       }
+
+       return targets
+}
+
+func scanSymlinks(filePath string) error {
+       rule := watchHandlers[filePath]
+       rule.upstreams.Empty()
+       target := filePath
+       for {
+               if isSymlink(target) {
+                       target = resolveSymlink(target)
+                       trackSymlink(target, filePath)
+               }
+               if rule.upstreams.Contains(target) || target == filePath {
+                       break
+               }
+
+               logger.V(1).Debugf("symlink resolution step yields abspath: %s", target)
+               rule.upstreams.Add(target)
+
+               subscribe(target)
+       }
 
-               for _, handler := range handlers {
-                       handler(event.Name, event.Op)
+       for _, target = range rule.upstreams.AsSortedSlice() {
+               // For each entry in the stack of symlinks between the original path, look for symlinks in
+               // every parent directory. Ensure those are watched, so that if one of those symlinks are
+               // replaced, we can fire a change event on the original file.
+               for _, link := range symlinksInFullpath(target) {
+                       target = link.target + strings.TrimPrefix(target, link.link)
+                       logger.V(1).Debugf("while adding watcher on file %s: rewrote abspath to %s based on symlink %s->%s",
+                               filePath, target, link.link, link.target)
+                       if _, err := os.Lstat(link.target); err != nil {
+                               logger.V(1).Warningf("failed to stat resolved parent directory %s: %v", link.target, err)
+                               continue
+                       }
+                       rule.upstreams.Add(target)
+                       trackSymlink(link.link, filePath)
+                       subscribe(link.link)
+                       subscribe(target)
                }
+               trackSymlink(target, filePath)
+       }
+       rule.absPath = target
+       rule.inum = inum(target)
+
+       return nil
+}
+
+func Debug() {
+       logger.V(1).Debugf("symlinkPropagationMap:")
+       if len(symlinkPropagationMap) == 0 {
+               logger.V(1).Debugf("  <NONE>")
+       }
+       for filePath, members := range symlinkPropagationMap {
+               logger.V(1).Debugf("   %s: %+v", filePath, members.AsSortedSlice())
+       }
+
+       logger.V(1).Debugf("watchHandlers:")
+       if len(watchHandlers) == 0 {
+               logger.V(1).Debugf("  <NONE>")
+       }
+       for filePath, rule := range watchHandlers {
+               logger.V(1).Debugf("   %s: upstreams=%+v, %+v", filePath, rule.upstreams.AsSortedSlice(), rule.callbacks)
+       }
+
+       logger.V(1).Debugf("fsnotify ref counts:")
+       if len(watched) == 0 {
+               logger.V(1).Debugf("  <NONE>")
+       }
+       for filePath, n := range watched {
+               logger.V(1).Debugf("  %s: %d", filePath, n)
        }
 }
 
-func NotifyPath(path string, callback NotifyFunc) error {
+func NotifyPath(filePath string, callback NotifyFunc) error {
        err := startWatcher()
        if err != nil {
                return err
        }
 
-       logger.V(1).Debugf("adding watcher on file %s", path)
-       err = addWatch(path)
-       if err != nil {
-               return err
+       filePath = realpath(filePath)
+       if _, err := os.Lstat(filePath); err != nil {
+               return fmt.Errorf("unable to watch %s: error calling lstat: %v", filePath, err)
        }
 
-       if _, ok := watchHandlers[path]; !ok {
-               watchHandlers[path] = make([]NotifyFunc, 0)
+       logger.V(1).Debugf("adding watcher on file %s", filePath)
+       err = subscribe(filePath)
+       if err != nil {
+               return err
        }
-       watchHandlers[path] = append(watchHandlers[path], callback)
 
-       dirPath := filepath.Dir(path)
-       if dirPath != path {
-               logger.V(2).Debugf("adding watcher to parent directory %s", dirPath)
-               err = addWatch(dirPath)
-               if err != nil {
-                       logger.Warnf("failed to add watcher for %s parent directory %s: %v", path, dirPath, err)
+       if _, ok := watchHandlers[filePath]; !ok {
+               watchHandlers[filePath] = &notifyRule{
+                       upstreams: hashset.NewHashSet[string](),
                }
        }
+       rule := watchHandlers[filePath]
+
+       rule.callbacks = append(watchHandlers[filePath].callbacks, callback)
+
+       scanSymlinks(filePath)
 
        return nil
 }
+
+// Stop clears all watch rules and idles (but does not shut down) the global watcher.
+func Stop() {
+       startWatcherMu.Lock()
+
+       if gWatcher == nil {
+               startWatcherMu.Unlock()
+               return
+       }
+       startWatcherMu.Unlock()
+
+       for path, _ := range watched {
+               unsubscribeInternal(path)
+       }
+
+       startWatcherMu.Lock()
+       initState()
+       startWatcherMu.Unlock()
+}
diff --git a/mtls/fsnotify/fsnotify_test.go b/mtls/fsnotify/fsnotify_test.go
new file mode 100644 (file)
index 0000000..7707f70
--- /dev/null
@@ -0,0 +1,144 @@
+package fsnotify
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "path"
+       "time"
+
+       "go.fuhry.dev/fsnotify"
+       "go.fuhry.dev/runtime/utils/checkers"
+       "go.fuhry.dev/runtime/utils/hashset"
+       . "gopkg.in/check.v1"
+)
+
+const defaultMode os.FileMode = 0600
+
+var spMapContains = checkers.ContainsT[string, *hashset.HashSet[string]]()
+
+func testNotify(c *C, filename string, writeFunc func()) {
+       ch := make(chan struct{})
+       nf := func(string, fsnotify.Op) {
+               ch <- struct{}{}
+       }
+
+       err := NotifyPath(filename, nf)
+       c.Assert(err, IsNil)
+       defer Stop()
+       writeFunc()
+
+       ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+       defer cancel()
+       select {
+       case <-ctx.Done():
+               fmt.Fprintf(os.Stderr, "timed out waiting for notify event on %s\n", filename)
+               Debug()
+               c.Fail()
+       case <-ch:
+       }
+}
+
+func ls(path string) {
+       attrs := &os.ProcAttr{
+               Files: []*os.File{os.Stdin, os.Stdout, os.Stderr},
+               Dir:   "/",
+       }
+       fmt.Fprintf(os.Stderr, "ls -la %s\n", path)
+       if proc, err := os.StartProcess("/bin/ls", []string{"ls", "-la", "--color=auto", path}, attrs); err == nil {
+               proc.Wait()
+       }
+}
+
+func tempFilename(c *C) string {
+       file, err := os.CreateTemp("", "")
+       c.Assert(err, IsNil)
+       file.Close()
+
+       s.tempFiles.Add(file.Name())
+       return file.Name()
+}
+
+func (s *FsnotifySuite) TestSimpleWatch(c *C) {
+       filename := tempFilename(c)
+
+       testNotify(c, filename, func() {
+               c.Assert(os.WriteFile(filename, []byte("hello"), defaultMode), IsNil)
+       })
+}
+
+func (s *FsnotifySuite) TestSymlinkTargetChanged(c *C) {
+       filename := tempFilename(c)
+       c.Assert(os.Symlink(filename, filename+".lnk"), IsNil)
+       defer os.Remove(filename + ".lnk")
+       testNotify(c, filename+".lnk", func() {
+               c.Assert(symlinkPropagationMap, spMapContains, filename)
+               c.Assert(os.WriteFile(filename, []byte("hello"), defaultMode), IsNil)
+       })
+}
+
+func (s *FsnotifySuite) TestBackingFileChangedWithSymlinkPathComponent(c *C) {
+       filename := tempFilename(c)
+       c.Assert(os.Remove(filename), IsNil)
+
+       linkToDir := tempFilename(c)
+       c.Assert(os.Remove(linkToDir), IsNil)
+
+       realDir, err := os.MkdirTemp("", "")
+       c.Assert(err, IsNil)
+       defer os.RemoveAll(realDir)
+
+       linkTarget := path.Join(linkToDir, "file")
+       realFile := path.Join(realDir, "file")
+
+       c.Assert(os.Symlink(realDir, linkToDir), IsNil)
+       c.Assert(os.Symlink(linkTarget, filename), IsNil)
+
+       c.Assert(os.WriteFile(realFile, []byte("hello"), defaultMode), IsNil)
+
+       // Overwrite the original backing file with new contents
+       testNotify(c, filename, func() {
+               c.Assert(symlinkPropagationMap, spMapContains, realFile)
+               c.Assert(symlinkPropagationMap, spMapContains, linkToDir)
+               c.Assert(symlinkPropagationMap, spMapContains, linkTarget)
+               c.Assert(os.WriteFile(realFile, []byte("changed"), defaultMode), IsNil)
+       })
+}
+
+func (s *FsnotifySuite) TestPathComponentSymlinkChanged(c *C) {
+       filename := tempFilename(c)
+       c.Assert(os.Remove(filename), IsNil)
+
+       linkToDir := tempFilename(c)
+       c.Assert(os.Remove(linkToDir), IsNil)
+
+       realDir1, err := os.MkdirTemp("", "")
+       c.Assert(err, IsNil)
+       defer os.RemoveAll(realDir1)
+
+       realDir2, err := os.MkdirTemp("", "")
+       c.Assert(err, IsNil)
+       defer os.RemoveAll(realDir2)
+
+       linkTarget := path.Join(linkToDir, "file")
+       realFile1 := path.Join(realDir1, "file")
+
+       c.Assert(os.Symlink(realDir1, linkToDir), IsNil)
+       c.Assert(os.Symlink(linkTarget, filename), IsNil)
+
+       c.Assert(os.WriteFile(realFile1, []byte("hello"), defaultMode), IsNil)
+
+       // Swap the linkToDir symlink from pointing at realDir1 to realDir2, changing the underlying
+       // file but not triggering any events on the watched path itself or any of its upstreams
+       testNotify(c, filename, func() {
+               c.Assert(symlinkPropagationMap, spMapContains, realFile1)
+               c.Assert(symlinkPropagationMap, spMapContains, linkToDir)
+               c.Assert(symlinkPropagationMap, spMapContains, linkTarget)
+
+               realFile2 := path.Join(realDir2, "file")
+               c.Assert(os.WriteFile(realFile2, []byte("world"), defaultMode), IsNil)
+
+               c.Assert(os.Symlink(realDir2, linkToDir+".new"), IsNil)
+               c.Assert(os.Rename(linkToDir+".new", linkToDir), IsNil)
+       })
+}
diff --git a/mtls/fsnotify/resolve_symlink_tests/01/one b/mtls/fsnotify/resolve_symlink_tests/01/one
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/mtls/fsnotify/resolve_symlink_tests/02/one b/mtls/fsnotify/resolve_symlink_tests/02/one
new file mode 100644 (file)
index 0000000..d00491f
--- /dev/null
@@ -0,0 +1 @@
+1
diff --git a/mtls/fsnotify/resolve_symlink_tests/02/two b/mtls/fsnotify/resolve_symlink_tests/02/two
new file mode 120000 (symlink)
index 0000000..43dd47e
--- /dev/null
@@ -0,0 +1 @@
+one
\ No newline at end of file
diff --git a/mtls/fsnotify/resolve_symlink_tests/03/one b/mtls/fsnotify/resolve_symlink_tests/03/one
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/mtls/fsnotify/resolve_symlink_tests/03/three b/mtls/fsnotify/resolve_symlink_tests/03/three
new file mode 120000 (symlink)
index 0000000..64c5e58
--- /dev/null
@@ -0,0 +1 @@
+two
\ No newline at end of file
diff --git a/mtls/fsnotify/resolve_symlink_tests/03/two b/mtls/fsnotify/resolve_symlink_tests/03/two
new file mode 120000 (symlink)
index 0000000..43dd47e
--- /dev/null
@@ -0,0 +1 @@
+one
\ No newline at end of file
diff --git a/mtls/fsnotify/resolve_symlink_tests/04/one b/mtls/fsnotify/resolve_symlink_tests/04/one
new file mode 120000 (symlink)
index 0000000..64c5e58
--- /dev/null
@@ -0,0 +1 @@
+two
\ No newline at end of file
diff --git a/mtls/fsnotify/resolve_symlink_tests/04/two b/mtls/fsnotify/resolve_symlink_tests/04/two
new file mode 120000 (symlink)
index 0000000..43dd47e
--- /dev/null
@@ -0,0 +1 @@
+one
\ No newline at end of file
diff --git a/mtls/fsnotify/resolve_symlink_tests/05/one b/mtls/fsnotify/resolve_symlink_tests/05/one
new file mode 120000 (symlink)
index 0000000..1d19714
--- /dev/null
@@ -0,0 +1 @@
+three
\ No newline at end of file
diff --git a/mtls/fsnotify/resolve_symlink_tests/05/three b/mtls/fsnotify/resolve_symlink_tests/05/three
new file mode 120000 (symlink)
index 0000000..64c5e58
--- /dev/null
@@ -0,0 +1 @@
+two
\ No newline at end of file
diff --git a/mtls/fsnotify/resolve_symlink_tests/05/two b/mtls/fsnotify/resolve_symlink_tests/05/two
new file mode 120000 (symlink)
index 0000000..43dd47e
--- /dev/null
@@ -0,0 +1 @@
+one
\ No newline at end of file
diff --git a/mtls/fsnotify/resolve_symlink_tests/06/one b/mtls/fsnotify/resolve_symlink_tests/06/one
new file mode 120000 (symlink)
index 0000000..e466dcb
--- /dev/null
@@ -0,0 +1 @@
+invalid
\ No newline at end of file
diff --git a/mtls/fsnotify/resolve_symlink_tests/07/one b/mtls/fsnotify/resolve_symlink_tests/07/one
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/mtls/fsnotify/resolve_symlink_tests/07/sub/two b/mtls/fsnotify/resolve_symlink_tests/07/sub/two
new file mode 120000 (symlink)
index 0000000..747114a
--- /dev/null
@@ -0,0 +1 @@
+../one
\ No newline at end of file
diff --git a/mtls/fsnotify/resolve_symlink_tests/08/one b/mtls/fsnotify/resolve_symlink_tests/08/one
new file mode 120000 (symlink)
index 0000000..11ef329
--- /dev/null
@@ -0,0 +1 @@
+sub/two
\ No newline at end of file
diff --git a/mtls/fsnotify/resolve_symlink_tests/08/sub/two b/mtls/fsnotify/resolve_symlink_tests/08/sub/two
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/mtls/fsnotify/util.go b/mtls/fsnotify/util.go
new file mode 100644 (file)
index 0000000..5280cc4
--- /dev/null
@@ -0,0 +1,192 @@
+package fsnotify
+
+import (
+       "io/fs"
+       "os"
+       "path"
+       "path/filepath"
+       "strings"
+       "syscall"
+
+       "go.fuhry.dev/runtime/utils/hashset"
+)
+
+type symlink struct {
+       link, target string
+}
+
+// inum returns the inode number of filePath.
+//
+// if filePath is a symlink, it is dereferenced.
+//
+// Returns -1 upon failure.
+func inum(filePath string) int64 {
+       stat, err := os.Stat(filePath)
+       if err != nil {
+               return -1
+       }
+       if sys, ok := stat.Sys().(*syscall.Stat_t); ok {
+               return int64(sys.Ino)
+       }
+       return -1
+}
+
+// isSymlink returns true if fullPath is a symlink, and false otherwise.
+//
+// Errors such as an readable parent directory or the given path not existing result in a return
+// value of false.
+func isSymlink(fullPath string) bool {
+       if stat, err := os.Lstat(fullPath); err == nil {
+               return stat.Mode()&fs.ModeSymlink == fs.ModeSymlink
+       }
+
+       return false
+}
+
+// getcwd returns the current working directory.
+func getcwd() string {
+       buf := make([]byte, 1024)
+       n, err := syscall.Getcwd(buf)
+       if err != nil {
+               panic("could not get cwd")
+       }
+       return string(buf[:n-1])
+}
+
+// realpath removes "." components and replaces ".." components in an absolute or relative path,
+// always returning an absolute path.
+//
+// filePath is not required to be a real file.
+//
+// Relative paths are resolved with respect to the current working directory.
+func realpath(filePath string) string {
+       if !path.IsAbs(filePath) {
+               cwd := getcwd()
+               filePath = path.Join(cwd, filePath)
+       }
+       parts := strings.Split(filePath, string(filepath.Separator))
+       var out []string
+       for _, part := range parts {
+               switch part {
+               case ".":
+                       continue
+               case "..":
+                       if len(out) > 1 {
+                               out = out[:len(out)-1]
+                       }
+               default:
+                       out = append(out, part)
+               }
+       }
+
+       return strings.Join(out, string(filepath.Separator))
+}
+
+// resolveSymlinkRecurse recursively resolves filePath until a real file is found.
+//
+// Both relative and absolute symlinks are handled and normalized.
+//
+// If the file at filePath is not a symlink, it is normalized and returned.
+//
+// The optional argument `stack` may be a string HashSet which is populated with normalized paths
+// of each resolution step. If set to nil, a new hashmap is created.
+//
+// resolveSymlinkRecurse stops when it encounters a non-symlink, loop or a dangling symlink.
+func resolveSymlinkRecurse(filePath string, stack *hashset.HashSet[string]) string {
+       if stack == nil {
+               stack = hashset.NewHashSet[string]()
+       }
+       filePath = realpath(filePath)
+       if stack.Contains(filePath) {
+               return filePath
+       }
+       stack.Add(filePath)
+
+       if isSymlink(filePath) {
+               // this is a symlink
+               if target, err := os.Readlink(filePath); err == nil {
+                       if path.IsAbs(target) {
+                               if target == filePath {
+                                       // prevent infinite symlink loops
+                                       return target
+                               }
+                               return resolveSymlinkRecurse(target, stack)
+                       }
+                       filePath = path.Join(path.Dir(filePath), target)
+                       filePath = realpath(filePath)
+                       return resolveSymlinkRecurse(filePath, stack)
+               }
+       }
+
+       return filePath
+}
+
+// resolveSymlink is a wrapper for os.Readlink that normalizes relative paths and resolves symlinks
+// to a maximum depth of one.
+func resolveSymlink(filePath string) string {
+       if isSymlink(filePath) {
+               if target, err := os.Readlink(filePath); err == nil {
+                       if path.IsAbs(target) {
+                               return target
+                       }
+                       filePath = path.Join(path.Dir(filePath), target)
+                       filePath = realpath(filePath)
+               }
+       }
+       return filePath
+}
+
+// symlinksInFullpath walks each component of the given path, checks if it's a symlink, and returns
+// a list of components which are symlinks, mapping each link to its target.
+//
+// For example, given the path:
+//
+//     /a/b/c/d/foo
+//
+// If "b" is a symlink to "e", this would return:
+//
+//     []symlink{{link: "/a/b", target: "/a/e"}}
+func symlinksInFullpath(fullPath string) []symlink {
+       fullPath = realpath(fullPath)
+
+       var out []symlink
+       pathParts := strings.Split(fullPath, "/")
+       for i, _ := range pathParts {
+               if i == 0 {
+                       continue
+               }
+
+               subpath := strings.Join(pathParts[:i], "/")
+               if isSymlink(subpath) {
+                       target := resolveSymlinkRecurse(subpath, nil)
+                       out = append(out, symlink{subpath, target})
+               }
+       }
+
+       return out
+}
+
+// dereferenceParents uses symlinksInFullpath to find any path components in fullPath which are
+// symlinks and re-writes fullPath with the resolved name.
+//
+// For example, given the path:
+//
+//     /a/b/c/d/foo
+//
+// If "b" is a symlink to "e", this would return:
+//
+//     /a/e/c/d/foo
+func dereferenceParents(fullPath string) string {
+       for _, link := range symlinksInFullpath(fullPath) {
+               fullPath = link.target + strings.TrimPrefix(fullPath, link.link)
+       }
+       return fullPath
+}
+
+// resolveSymlinkAndDereferenceParents is a wrapper for calling both resolveSymlinkRecurse and
+// dereferenceParents, returning the true absolute path of a file accounting for either the file
+// itself or any component in its full path being a symlink.
+func resolveSymlinkAndDereferenceParents(fullPath string) string {
+       fullPath = resolveSymlinkRecurse(fullPath, nil)
+       return dereferenceParents(fullPath)
+}
diff --git a/mtls/fsnotify/util_test.go b/mtls/fsnotify/util_test.go
new file mode 100644 (file)
index 0000000..0b3e857
--- /dev/null
@@ -0,0 +1,57 @@
+package fsnotify
+
+import (
+       "fmt"
+       "path"
+
+       . "gopkg.in/check.v1"
+)
+
+func (s *FsnotifySuite) TestRealpath(c *C) {
+       type testCase struct {
+               in string
+               e  string
+       }
+
+       testCases := []*testCase{
+               {"/etc/foo/bar", "/etc/foo/bar"},
+               {"/etc/foo/..", "/etc"},
+               {"/etc/foo/../hosts", "/etc/hosts"},
+               {"/../etc/hosts", "/etc/hosts"},
+               {"/etc/foo/./hosts", "/etc/foo/hosts"},
+               {"/./etc/hosts", "/etc/hosts"},
+               {"/etc/foo/../../var/run", "/var/run"},
+       }
+
+       for _, tc := range testCases {
+               got := realpath(tc.in)
+               c.Assert(got, Equals, tc.e) // , "test case %d", i)
+       }
+}
+
+func (s *FsnotifySuite) TestResolveSymlink(c *C) {
+       type testCase struct {
+               in   string
+               e    string
+               desc string
+       }
+
+       testCases := []*testCase{
+               {"one", "one", "not a symlink"},
+               {"two", "one", "simple valid symlink"},
+               {"three", "one", "two levels deep symlink"},
+               {"one", "one", "recursive set with 2 links"},
+               {"three", "three", "recursive set with 3 links"},
+               {"one", "invalid", "target missing"},
+               {"sub/two", "one", "link to item in parent directory"},
+               {"one", "sub/two", "link to item in child directory"},
+       }
+
+       cwd := getcwd()
+       for i, tc := range testCases {
+               testDir := path.Join(cwd, "resolve_symlink_tests", fmt.Sprintf("%02d", i+1))
+               in := path.Join(testDir, tc.in)
+               got := resolveSymlinkRecurse(in, nil)
+               c.Assert(got, Equals, path.Join(testDir, tc.e)) // , "test case %d: %s", i+1, tc.desc)
+       }
+}
diff --git a/utils/checkers/contains_key.go b/utils/checkers/contains_key.go
new file mode 100644 (file)
index 0000000..5962e64
--- /dev/null
@@ -0,0 +1,38 @@
+package checkers
+
+import (
+       "fmt"
+
+       . "gopkg.in/check.v1"
+)
+
+type containsKeyChecker[T comparable, Q any] struct{}
+
+func (c *containsKeyChecker[T, Q]) Check(params []any, names []string) (bool, string) {
+       m, ok := params[0].(map[T]Q)
+       if !ok {
+               return false, "first argument must be a map with key type T"
+       }
+       k, ok := params[1].(T)
+       if !ok {
+               return false, "second argument must be of type T"
+       }
+       if _, ok := m[k]; !ok {
+               return false, fmt.Sprintf("map does not contain key %+v", k)
+       }
+       return true, ""
+}
+
+func (c *containsKeyChecker[T, Q]) Info() *CheckerInfo {
+       return &CheckerInfo{
+               Name: "ContainsKey",
+               Params: []string{
+                       "map",
+                       "key",
+               },
+       }
+}
+
+func ContainsT[T comparable, Q any]() Checker {
+       return &containsKeyChecker[T, Q]{}
+}