]> go.fuhry.dev Git - runtime.git/commitdiff
[ephs] split out low level client, add r/w permissions
authorDan Fuhry <dan@fuhry.com>
Sat, 14 Mar 2026 23:14:05 +0000 (19:14 -0400)
committerDan Fuhry <dan@fuhry.com>
Sat, 14 Mar 2026 23:22:17 +0000 (19:22 -0400)
ephsll contains the etcd interaction parts of ephs, and includes an `ephs.Client` implementation which can replace the default client for a process that has unrestricted access to the etcd prefix.

ACLs for ephs now also gain mode bits for read and write access separately, allowing subtrees or individual paths to be world-readable but writable only by the owner.

15 files changed:
cmd/ephs_client/BUILD.bazel
cmd/ephs_client/main.go
cmd/ephs_server/main.go
ephs/client.go
ephs/ephsll/BUILD.bazel
ephs/ephsll/all_test.go [new file with mode: 0644]
ephs/ephsll/client.go [new file with mode: 0644]
ephs/ephsll/errors.go [new file with mode: 0644]
ephs/ephsll/low_level_client.go
ephs/ephsll/low_level_client_test.go [new file with mode: 0644]
ephs/path.go
ephs/servicer/BUILD.bazel
ephs/servicer/acl.go
ephs/servicer/acl_test.go
ephs/servicer/servicer.go

index 906286db0cdb03064cdb41275362987d412ffb9c..1f4550e5df7e6dc73fdfe07e1aa34bf99364a71b 100644 (file)
@@ -8,6 +8,8 @@ go_library(
     deps = [
         "//constants",
         "//ephs",
+        "//ephs/ephsll",
+        "//mtls",
         "//utils/context",
         "//utils/fsutil",
         "//utils/log",
index 93b4f6dc93144f4cbac8a882bb46422bd27ec0a2..c90f904f22457b46d7c51876e6aae90f483cde0e 100644 (file)
@@ -16,13 +16,31 @@ import (
 
        "go.fuhry.dev/runtime/constants"
        "go.fuhry.dev/runtime/ephs"
+       "go.fuhry.dev/runtime/ephs/ephsll"
+       "go.fuhry.dev/runtime/mtls"
        "go.fuhry.dev/runtime/utils/context"
        "go.fuhry.dev/runtime/utils/fsutil"
        "go.fuhry.dev/runtime/utils/log"
 )
 
+func ephsClient(cmd *cli.Command) (ephs.Client, error) {
+       /*
+               if !cmd.Bool("ll") {
+                       return ephs.DefaultClient()
+               }
+       */
+
+       identity := mtls.DefaultIdentity()
+       ll, err := ephsll.NewEphsLowLevelClient(identity)
+       if err != nil {
+               return nil, err
+       }
+
+       return ll.HighLevelClient(), nil
+}
+
 func cmdStat(ctx context.Context, cmd *cli.Command) error {
-       client, err := ephs.DefaultClient()
+       client, err := ephsClient(cmd)
        if err != nil {
                return err
        }
@@ -37,7 +55,7 @@ func cmdStat(ctx context.Context, cmd *cli.Command) error {
 }
 
 func cmdCat(ctx context.Context, cmd *cli.Command) error {
-       client, err := ephs.DefaultClient()
+       client, err := ephsClient(cmd)
        if err != nil {
                return err
        }
@@ -53,7 +71,7 @@ func cmdCat(ctx context.Context, cmd *cli.Command) error {
 }
 
 func cmdCopy(ctx context.Context, cmd *cli.Command) error {
-       client, err := ephs.DefaultClient()
+       client, err := ephsClient(cmd)
        if err != nil {
                return err
        }
@@ -112,7 +130,7 @@ func cmdCopy(ctx context.Context, cmd *cli.Command) error {
 }
 
 func cmdDelete(ctx context.Context, cmd *cli.Command) error {
-       client, err := ephs.DefaultClient()
+       client, err := ephsClient(cmd)
        if err != nil {
                return err
        }
@@ -127,7 +145,7 @@ func cmdDelete(ctx context.Context, cmd *cli.Command) error {
 }
 
 func cmdWatch(ctx context.Context, cmd *cli.Command) error {
-       client, err := ephs.DefaultClient()
+       client, err := ephsClient(cmd)
        if err != nil {
                return err
        }
@@ -144,7 +162,7 @@ func cmdWatch(ctx context.Context, cmd *cli.Command) error {
 }
 
 func cmdMkdir(ctx context.Context, cmd *cli.Command) error {
-       client, err := ephs.DefaultClient()
+       client, err := ephsClient(cmd)
        if err != nil {
                return err
        }
@@ -159,7 +177,7 @@ func cmdMkdir(ctx context.Context, cmd *cli.Command) error {
 }
 
 func cmdEdit(ctx context.Context, cmd *cli.Command) error {
-       client, err := ephs.DefaultClient()
+       client, err := ephsClient(cmd)
        if err != nil {
                return err
        }
@@ -259,12 +277,17 @@ func main() {
        ctx, cancel := context.Interruptible()
        defer cancel()
 
-       flag.Parse()
-
+       exe, _ := os.Executable()
        cmd := &cli.Command{
-               Name:        "ephs",
-               Version:     constants.Version,
-               Description: "interact with ephs",
+               Name:                  path.Base(exe),
+               Version:               constants.Version,
+               Description:           "interact with ephs",
+               AllowExtFlags:         true,
+               EnableShellCompletion: true,
+               Before: func(ctx context.Context, c *cli.Command) (context.Context, error) {
+                       flag.CommandLine.Parse([]string{})
+                       return ctx, nil
+               },
 
                Commands: []*cli.Command{
                        {
@@ -364,25 +387,15 @@ func main() {
                },
 
                Flags: []cli.Flag{
-                       &cli.IntFlag{
-                               Name:  "vv",
-                               Usage: "verbosity level",
-                       },
-                       &cli.StringFlag{
-                               Name:  "v",
-                               Usage: "log level",
-                       },
-                       &cli.StringFlag{
-                               Name:  "grpc.transport",
-                               Usage: "grpc transport (tcp or quic)",
+                       &cli.BoolFlag{
+                               Name:    "ll",
+                               Aliases: []string{"low-level"},
+                               Usage:   "use low-level client, communicating directly with etcd instead of looking for an ephs server",
                        },
                },
        }
 
-       args := os.Args[:1]
-       args = append(args, flag.Args()...)
-
-       if err := cmd.Run(ctx, args); err != nil {
+       if err := cmd.Run(ctx, os.Args); err != nil {
                log.Fatal(err)
                os.Exit(1)
        }
index 79685cc0784cdda85fe3f25721707c80f3ef22fd..c4bb9ed06f3eb0c1d4e49f2b3bcb22e82aba68f1 100644 (file)
@@ -26,11 +26,6 @@ func main() {
        flag.Parse()
 
        serverIdentity := mtls.DefaultIdentity()
-       s, err := grpc.NewGrpcServer(serverIdentity, grpc.WithTransport(&grpc.QUICConnectionFactory{}))
-       if err != nil {
-               log.Fatal(err)
-               os.Exit(1)
-       }
 
        var opts []servicer.Option
        if *acl != "" {
@@ -49,6 +44,15 @@ func main() {
                os.Exit(1)
        }
 
+       serv.InstallClient()
+
+       s, err := grpc.NewGrpcServer(serverIdentity,
+               grpc.WithTransport(&grpc.QUICConnectionFactory{}))
+       if err != nil {
+               log.Fatal(err)
+               os.Exit(1)
+       }
+
        ctx, cancel := context.Interruptible()
        defer cancel()
 
@@ -63,7 +67,3 @@ func main() {
 
        <-ctx.Done()
 }
-
-func init() {
-       mtls.SetDefaultIdentity("ephs")
-}
index 5a1943db0b5aecb8850ba8d0eb550df7fbbfb238..c2006c5431058a7a08a136b0feff1e9158cf8157 100644 (file)
@@ -78,6 +78,8 @@ type getReader struct {
        cancel context.CancelFunc
 }
 
+var logger = log.WithPrefix("ephs.Client")
+
 var DefaultClientContext context.Context
 
 var ephsQuicConfig = &quic.Config{
@@ -206,7 +208,18 @@ func DefaultClient() (Client, error) {
        return defaultClient, nil
 }
 
+func OverrideDefaultClient(c Client) {
+       defaultClientMu.Lock()
+       defer defaultClientMu.Unlock()
+
+       defaultClient = c
+}
+
 func NewClient(ctx context.Context, localId mtls.Identity, opts ...ClientOption) (Client, error) {
+       if localId.Class() == mtls.AnonymousPrincipal {
+               return nil, fmt.Errorf("ephs is not available with anonymous authentication")
+       }
+
        cl := &clientImpl{
                defaultCtx:     ctx,
                defaultTimeout: 15 * time.Second,
@@ -448,16 +461,16 @@ func (c *clientImpl) watch(origCtx context.Context, origPath string, stream ephs
                                if rpc, err := c.grpcClient(); err == nil {
                                        stream, err = rpc.Watch(origCtx, &ephs_pb.GetRequest{Path: origPath})
                                        if err == nil {
-                                               log.Default().Noticef("reconnected watcher with new stream")
+                                               logger.Noticef("reconnected watcher with new stream")
                                                continue
                                        } else {
-                                               log.Default().Errorf("error reestablishing watch stream: %v", err)
+                                               logger.Errorf("error reestablishing watch stream: %v", err)
                                        }
                                } else {
-                                       log.Default().Errorf("error reconnecting: %v", err)
+                                       logger.Errorf("error reconnecting: %v", err)
                                }
                        }
-                       log.Default().Errorf("error receiving watch stream: %v", err)
+                       logger.Errorf("error receiving watch stream: %v", err)
                        return
                }
 
@@ -495,10 +508,10 @@ func (c *clientImpl) grpcClient() (ephs_pb.EphsClient, error) {
                if lastErr == nil {
                        break
                }
-               log.Default().Warningf("error establishing grpc connection to ephs server, retrying in 1s: %T: %v", lastErr, lastErr)
+               logger.Warningf("error establishing grpc connection to ephs server, retrying in 1s: %T: %v", lastErr, lastErr)
                select {
                case <-c.defaultCtx.Done():
-                       log.Default().Error(c.defaultCtx.Err())
+                       logger.Error(c.defaultCtx.Err())
                        return nil, c.defaultCtx.Err()
                case <-time.NewTimer(1 * time.Second).C:
                }
@@ -507,8 +520,8 @@ func (c *clientImpl) grpcClient() (ephs_pb.EphsClient, error) {
                return nil, lastErr
        }
 
-       confFsCl := ephs_pb.NewEphsClient(conn)
-       return confFsCl, nil
+       grpcCl := ephs_pb.NewEphsClient(conn)
+       return grpcCl, nil
 }
 
 func humanFilesize(s uint64) string {
index 8601f28d4eab357783b3fafccf26c678c9dbdb49..3794d7e2b4b2e6f6c59afa99614a486652e45b91 100644 (file)
@@ -1,8 +1,10 @@
-load("@rules_go//go:def.bzl", "go_library")
+load("@rules_go//go:def.bzl", "go_library", "go_test")
 
 go_library(
     name = "ephsll",
     srcs = [
+        "client.go",
+        "errors.go",
         "fs_object.go",
         "low_level_client.go",
         "s3.go",
@@ -15,6 +17,7 @@ go_library(
         "//mtls",
         "//proto/service/ephs",
         "//sd",
+        "//utils/context",
         "//utils/log",
         "//utils/option",
         "@com_github_minio_minio_go_v7//:minio-go",
@@ -27,3 +30,20 @@ go_library(
         "@org_golang_google_protobuf//types/known/timestamppb",
     ],
 )
+
+go_test(
+    name = "ephsll_test",
+    srcs = [
+        "all_test.go",
+        "low_level_client_test.go",
+    ],
+    embed = [":ephsll"],
+    deps = [
+        "//ephs",
+        "//mtls",
+        "//testlibs/etcdtest",
+        "//utils/log",
+        "@in_gopkg_check_v1//:check_v1",
+        "@io_etcd_go_etcd_client_v3//:client",
+    ],
+)
diff --git a/ephs/ephsll/all_test.go b/ephs/ephsll/all_test.go
new file mode 100644 (file)
index 0000000..09d5f7f
--- /dev/null
@@ -0,0 +1,68 @@
+package ephsll
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       etcd_client "go.etcd.io/etcd/client/v3"
+       "go.fuhry.dev/runtime/ephs"
+       "go.fuhry.dev/runtime/mtls"
+       "go.fuhry.dev/runtime/testlibs/etcdtest"
+       "go.fuhry.dev/runtime/utils/log"
+       . "gopkg.in/check.v1"
+)
+
+const testCell = "ephs.test.invalid"
+
+type EphsLowLevelTestSuite struct {
+       etcd   etcdtest.EtcdTestServer
+       llc    EphsLowLevelClient
+       ctx    context.Context
+       cancel context.CancelFunc
+       id     mtls.Identity
+}
+
+var s = &EphsLowLevelTestSuite{}
+var _ = Suite(s)
+
+func Test(t *testing.T) {
+       TestingT(t)
+}
+
+func (s *EphsLowLevelTestSuite) SetUpSuite(c *C) {
+       s.ctx, s.cancel = context.WithTimeout(context.Background(), 3*time.Second)
+       s.id = mtls.NewLazyIdentity(mtls.ServicePrincipal, "ephs")
+
+       ephs.OverrideDefaultCell(testCell)
+
+       ms, err := etcdtest.NewEtcdTestServer()
+       c.Assert(err, IsNil)
+
+       s.etcd = ms
+
+       llc := &ephsLowLevelClientImpl{
+               id:     s.id,
+               logger: log.WithPrefix("EphsLowLevelClient"),
+               clients: map[string]*etcd_client.Client{
+                       testCell: s.etcdClient(c),
+               },
+       }
+       s.llc = llc
+}
+
+func (s *EphsLowLevelTestSuite) TearDownSuite(c *C) {
+       if s.etcd != nil {
+               s.etcd.Close()
+       }
+       s.cancel()
+}
+
+func (s *EphsLowLevelTestSuite) etcdClient(c *C) *etcd_client.Client {
+       c.Assert(s.etcd, NotNil)
+
+       client, err := s.etcd.Client()
+       c.Assert(err, IsNil)
+
+       return client
+}
diff --git a/ephs/ephsll/client.go b/ephs/ephsll/client.go
new file mode 100644 (file)
index 0000000..5d8d355
--- /dev/null
@@ -0,0 +1,142 @@
+package ephsll
+
+import (
+       "context"
+       "io"
+
+       "go.fuhry.dev/runtime/ephs"
+       "go.fuhry.dev/runtime/mtls"
+       ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
+)
+
+type ephsllFrontendClient struct {
+       baseCtx context.Context
+       ll      EphsLowLevelClient
+       localId mtls.Identity
+}
+
+var _ ephs.Client = &ephsllFrontendClient{}
+
+func newFrontendClient(baseCtx context.Context, ll EphsLowLevelClient, localId mtls.Identity) ephs.Client {
+       return &ephsllFrontendClient{
+               baseCtx: baseCtx,
+               ll:      ll,
+               localId: localId,
+       }
+}
+
+// Get retrieves the content of a file at the specified path
+func (c *ephsllFrontendClient) Get(path string) (io.Reader, error) {
+       return c.GetContext(c.baseCtx, path)
+}
+
+// GetContext retrieves the content of a file at the specified path with context
+func (c *ephsllFrontendClient) GetContext(ctx context.Context, path string) (io.ReadCloser, error) {
+       ephsPath, err := ephs.ParsePath(path)
+       if err != nil {
+               return nil, err
+       }
+
+       entry, reader, err := c.ll.Get(ctx, ephsPath)
+       if err != nil {
+               return nil, err
+       }
+
+       // Return a wrapper that properly handles closing
+       return &ephsReadCloser{
+               reader: reader,
+               entry:  entry,
+       }, nil
+}
+
+// Stat retrieves metadata about a file or directory at the specified path
+func (c *ephsllFrontendClient) Stat(path string) (*ephs_proto.FsEntry, error) {
+       return c.StatContext(c.baseCtx, path)
+}
+
+// StatContext retrieves metadata about a file or directory at the specified path with context
+func (c *ephsllFrontendClient) StatContext(ctx context.Context, path string) (*ephs_proto.FsEntry, error) {
+       ephsPath, err := ephs.ParsePath(path)
+       if err != nil {
+               return nil, err
+       }
+
+       return c.ll.Stat(ctx, ephsPath)
+}
+
+// Put writes content to a file at the specified path
+func (c *ephsllFrontendClient) Put(path string, size uint64, content io.Reader) (*ephs_proto.FsEntry, error) {
+       return c.PutContext(c.baseCtx, path, size, content)
+}
+
+// PutContext writes content to a file at the specified path with context
+func (c *ephsllFrontendClient) PutContext(ctx context.Context, path string, size uint64, content io.Reader) (*ephs_proto.FsEntry, error) {
+       ephsPath, err := ephs.ParsePath(path)
+       if err != nil {
+               return nil, err
+       }
+
+       return c.ll.Put(ctx, ephsPath, c.localId.Name(), 0, size, content)
+}
+
+// Delete removes a file or directory at the specified path
+func (c *ephsllFrontendClient) Delete(path string, recursive bool) error {
+       return c.DeleteContext(c.baseCtx, path, recursive)
+}
+
+// DeleteContext removes a file or directory at the specified path with context
+func (c *ephsllFrontendClient) DeleteContext(ctx context.Context, path string, recursive bool) error {
+       ephsPath, err := ephs.ParsePath(path)
+       if err != nil {
+               return err
+       }
+
+       return c.ll.Delete(ctx, ephsPath, recursive)
+}
+
+// MkDir creates a directory at the specified path
+func (c *ephsllFrontendClient) MkDir(path string, recursive bool) error {
+       return c.MkDirContext(c.baseCtx, path, recursive)
+}
+
+// MkDirContext creates a directory at the specified path with context
+func (c *ephsllFrontendClient) MkDirContext(ctx context.Context, path string, recursive bool) error {
+       ephsPath, err := ephs.ParsePath(path)
+       if err != nil {
+               return err
+       }
+
+       return c.ll.Mkdir(ctx, ephsPath, c.localId.Name(), recursive)
+}
+
+// Watch watches for changes to a file or directory at the specified path
+func (c *ephsllFrontendClient) Watch(ctx context.Context, path string) (<-chan *ephs_proto.WatchResponse, error) {
+       ephsPath, err := ephs.ParsePath(path)
+       if err != nil {
+               return nil, err
+       }
+
+       return c.ll.Watch(ctx, ephsPath)
+}
+
+// ephsReadCloser wraps an io.Reader to provide proper Close behavior
+type ephsReadCloser struct {
+       reader io.Reader
+       entry  *ephs_proto.FsEntry
+       closed bool
+}
+
+func (r *ephsReadCloser) Read(p []byte) (int, error) {
+       if r.closed {
+               return 0, io.EOF
+       }
+       return r.reader.Read(p)
+}
+
+func (r *ephsReadCloser) Close() error {
+       if rc, ok := r.reader.(io.ReadCloser); ok {
+               r.closed = true
+               return rc.Close()
+       }
+       return nil
+}
diff --git a/ephs/ephsll/errors.go b/ephs/ephsll/errors.go
new file mode 100644 (file)
index 0000000..e10ffdf
--- /dev/null
@@ -0,0 +1,23 @@
+package ephsll
+
+import (
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+)
+
+func IsAlreadyExists(err error) bool {
+       return errIsCode(err, codes.AlreadyExists)
+}
+
+func errIsCode(err error, code codes.Code) bool {
+       if err == nil {
+               return false
+       }
+
+       if st, ok := status.FromError(err); ok {
+               if st.Code() == code {
+                       return true
+               }
+       }
+       return false
+}
index 8fd085f7c6ba4610075b55dbbe22aa74f7b721d3..f67b26a380a043477d6323347873bce95d7d882b 100644 (file)
@@ -5,7 +5,6 @@
 package ephsll
 
 import (
-       "context"
        "fmt"
        "io"
        "slices"
@@ -24,6 +23,7 @@ import (
        "go.fuhry.dev/runtime/mtls"
        ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
        "go.fuhry.dev/runtime/sd"
+       "go.fuhry.dev/runtime/utils/context"
        "go.fuhry.dev/runtime/utils/log"
        "go.fuhry.dev/runtime/utils/option"
 )
@@ -31,6 +31,7 @@ import (
 type Option = option.Option[*ephsLowLevelClientImpl]
 
 type EphsLowLevelClient interface {
+       HighLevelClient() ephs.Client
        Stat(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error)
        Get(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, io.Reader, error)
        Mkdir(ctx context.Context, path ephs.Path, owner string, recursive bool) error
@@ -76,6 +77,12 @@ func NewEphsLowLevelClient(id mtls.Identity, opts ...Option) (EphsLowLevelClient
        return ll, nil
 }
 
+func (c *ephsLowLevelClientImpl) HighLevelClient() ephs.Client {
+       ctx, _ := context.Interruptible()
+
+       return newFrontendClient(ctx, c, c.id)
+}
+
 // Stat
 func (c *ephsLowLevelClientImpl) Stat(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error) {
        ent, err := c.getProtoForPath(ctx, path)
diff --git a/ephs/ephsll/low_level_client_test.go b/ephs/ephsll/low_level_client_test.go
new file mode 100644 (file)
index 0000000..503887e
--- /dev/null
@@ -0,0 +1,59 @@
+package ephsll
+
+import (
+       "bytes"
+       "io"
+
+       "go.fuhry.dev/runtime/ephs"
+       . "gopkg.in/check.v1"
+)
+
+func (s *EphsLowLevelTestSuite) TestMkdir(c *C) {
+       path, err := ephs.ParsePath("/ephs/local/test_dir")
+       c.Assert(err, IsNil)
+
+       err = s.llc.Mkdir(s.ctx, path, s.id.Name(), true)
+       c.Assert(err, IsNil)
+}
+
+func (s *EphsLowLevelTestSuite) TestPut(c *C) {
+       path, err := ephs.ParsePath("/ephs/local/test_put")
+       c.Assert(err, IsNil)
+
+       err = s.llc.Mkdir(s.ctx, path.Parent(), s.id.Name(), true)
+       if err != nil {
+               c.Assert(IsAlreadyExists(err), Equals, true)
+       }
+
+       contents := bytes.NewBufferString("test ephs file contents")
+       fse, err := s.llc.Put(s.ctx, path, s.id.Name(), 0, uint64(contents.Len()), contents)
+       c.Assert(err, IsNil)
+       c.Assert(fse.GetContent().GetFile(), NotNil)
+}
+
+func (s *EphsLowLevelTestSuite) TestGet(c *C) {
+       path, err := ephs.ParsePath("/ephs/local/test_get")
+       c.Assert(err, IsNil)
+
+       err = s.llc.Mkdir(s.ctx, path.Parent(), s.id.Name(), true)
+       if err != nil {
+               c.Assert(IsAlreadyExists(err), Equals, true)
+       }
+
+       contents := []byte("test ephs file contents")
+       size := uint64(len(contents))
+       _, err = s.llc.Put(s.ctx, path, s.id.Name(), 1, size, bytes.NewBuffer(contents))
+       c.Assert(err, IsNil)
+
+       fse, reader, err := s.llc.Get(s.ctx, path)
+       c.Assert(err, IsNil)
+       c.Assert(fse.GetVersion(), Equals, uint64(1))
+       c.Assert(fse.GetContent(), NotNil)
+       c.Assert(fse.GetContent().GetFile(), NotNil)
+       c.Assert(fse.GetSize(), Equals, size)
+
+       readContents, err := io.ReadAll(reader)
+       c.Assert(err, IsNil)
+
+       c.Assert(readContents, DeepEquals, contents)
+}
index 3cf305857fcb75084d08984b0a0a6e3f483d1ee6..7562a9351f337be174c3c1997c97915164248b42 100644 (file)
@@ -40,6 +40,10 @@ func DefaultCell() string {
        return ephsDefaultCell
 }
 
+func OverrideDefaultCell(cell string) {
+       ephsDefaultCell = cell
+}
+
 func IsEphsPath(p string) bool {
        if !strings.HasPrefix(p, KeyPrefix) {
                return false
index 17213fb3f4119c8f23dba908d20a55a752d47e79..3b7481a9cdbdca3a1cad3cbe3f3fce07212b43f4 100644 (file)
@@ -16,6 +16,7 @@ go_library(
         "//mtls",
         "//mtls/fsnotify",
         "//proto/service/ephs",
+        "//utils/context",
         "//utils/log",
         "//utils/option",
         "//utils/stringmatch",
index 5591e7c94ccf3ed7e9a5883fc347920095f128ff..5bac2b0abb1b0f121f6f36a02fb0751bc5c7e911 100644 (file)
@@ -5,6 +5,7 @@ import (
        "errors"
        "fmt"
        "os"
+       "slices"
 
        "gopkg.in/yaml.v3"
 
@@ -13,15 +14,43 @@ import (
 )
 
 type AclRule struct {
-       Principal *stringmatch.NewSyntaxMatchRule `yaml:"principal"`
-       Key       *stringmatch.NewSyntaxMatchRule `yaml:"key"`
-       Invert    bool                            `yaml:"invert"`
+       Principal   *stringmatch.NewSyntaxMatchRule `yaml:"principal"`
+       Key         *stringmatch.NewSyntaxMatchRule `yaml:"key"`
+       Permissions *PermissionString               `yaml:"perm"`
+       Invert      bool                            `yaml:"invert"`
 }
 
 type Acl struct {
        Rules []*AclRule `yaml:"rules"`
 }
 
+type PermissionString string
+type PermissionBit byte
+
+const (
+       PermissionRead  PermissionBit = 'r'
+       PermissionWrite PermissionBit = 'w'
+)
+
+var permissionBytes = []byte{
+       byte(PermissionRead),
+       byte(PermissionWrite),
+}
+
+func (p *PermissionString) UnmarshalYAML(node *yaml.Node) error {
+       for _, b := range []byte(node.Value) {
+               if !slices.Contains(permissionBytes, b) {
+                       return fmt.Errorf("unknown permission bit: %s", string(b))
+               }
+       }
+       *p = PermissionString(node.Value)
+       return nil
+}
+
+func (p *PermissionString) Has(b PermissionBit) bool {
+       return slices.Contains([]PermissionBit(*p), b)
+}
+
 func LoadAcl(filePath string) (*Acl, error) {
        fp, err := os.OpenFile(filePath, os.O_RDONLY, os.FileMode(0))
        if err != nil {
@@ -57,7 +86,7 @@ func LoadAclString(text string) (*Acl, error) {
        return acl, nil
 }
 
-func (r *AclRule) Match(principal, key string) bool {
+func (r *AclRule) Match(principal, key string, op PermissionBit) bool {
        if !r.Principal.Match(principal) {
                return false
        }
@@ -77,12 +106,20 @@ func (r *AclRule) Match(principal, key string) bool {
 
        keyMatcher = keyMatcher.Sub(vars)
 
-       return keyMatcher.Match(key)
+       if !keyMatcher.Match(key) {
+               return false
+       }
+
+       if r.Permissions != nil && !r.Permissions.Has(op) {
+               return false
+       }
+
+       return true
 }
 
-func (a *Acl) Check(principal, key string) bool {
+func (a *Acl) Check(principal, key string, op PermissionBit) bool {
        for _, r := range a.Rules {
-               if r.Match(principal, key) {
+               if r.Match(principal, key, op) {
                        return !r.Invert
                }
        }
index 6dba593e5cd8318399cba83b6fde48eacccc1aec..a70dc0cf87a6d6430def102be168b25ba72f3ca7 100644 (file)
@@ -12,6 +12,7 @@ rules:
       exact: spiffe://domain.example.com/user/bob
     key:
       exact: "{{name}}/can/write/this"
+    perm: rw
   - principal:
       any: true
     key:
@@ -22,10 +23,12 @@ rules:
         - exact: spiffe://domain.example.com/user/suzie
     key:
       prefix: or/test/{{name}}/
+    perm: r
   - principal:
       prefix: spiffe://domain.example.com/service/
     key:
       exact: acl/{{name}}.yaml
+    perm: r
 `
 
 func TestAcl(t *testing.T) {
@@ -33,6 +36,7 @@ func TestAcl(t *testing.T) {
 
        type testCase struct {
                description, princ, key string
+               op                      PermissionBit
                expect                  bool
        }
 
@@ -44,81 +48,107 @@ func TestAcl(t *testing.T) {
 
        testCases := []testCase{
                {
-                       "bob has access to bob/can/write/this",
+                       "bob has read access to bob/can/write/this",
                        "spiffe://domain.example.com/user/bob",
                        "bob/can/write/this",
+                       PermissionRead,
+                       true,
+               },
+               {
+                       "bob has write access to bob/can/write/this",
+                       "spiffe://domain.example.com/user/bob",
+                       "bob/can/write/this",
+                       PermissionWrite,
                        true,
                },
                {
                        "bob does not have access to suzie/can/write/this",
                        "spiffe://domain.example.com/user/bob",
                        "suzie/can/write/this",
+                       PermissionRead,
                        false,
                },
                {
                        "suzie does not have access to suzie/can/write/this",
                        "spiffe://domain.example.com/user/suzie",
                        "suzie/can/write/this",
+                       PermissionRead,
                        false,
                },
                {
                        "bob has access to writable/by/self/bob",
                        "spiffe://domain.example.com/user/bob",
                        "writable/by/self/bob",
+                       PermissionRead,
                        true,
                },
                {
                        "suzie has access to writable/by/self/suzie",
                        "spiffe://domain.example.com/user/suzie",
                        "writable/by/self/suzie",
+                       PermissionRead,
                        true,
                },
                {
                        "bob has access to writable/by/self/suzie",
                        "spiffe://domain.example.com/user/suzie",
                        "writable/by/self/suzie",
+                       PermissionRead,
                        true,
                },
                {
                        "bob has access to or/test/bob/foo",
                        "spiffe://domain.example.com/user/bob",
                        "or/test/bob/foo",
+                       PermissionRead,
                        true,
                },
                {
                        "suzie has access to or/test/suzie/foo",
                        "spiffe://domain.example.com/user/suzie",
                        "or/test/suzie/foo",
+                       PermissionRead,
                        true,
                },
                {
                        "frank does not have access to or/test/frank/foo",
                        "spiffe://domain.example.com/user/frank",
                        "or/test/frank/foo",
+                       PermissionRead,
                        false,
                },
                {
                        "bob does not have access no/rule/for/this",
                        "spiffe://domain.example.com/user/bob",
                        "no/rule/for/this",
+                       PermissionRead,
                        false,
                },
                {
-                       "bob has access to acl/bob.yaml",
+                       "bob has read access to acl/bob.yaml",
                        "spiffe://domain.example.com/service/bob",
                        "acl/bob.yaml",
+                       PermissionRead,
                        true,
                },
+               {
+                       "bob does not have write access to acl/bob.yaml",
+                       "spiffe://domain.example.com/service/bob",
+                       "acl/bob.yaml",
+                       PermissionWrite,
+                       false,
+               },
                {
                        "bob has no access to acl/frank.yaml",
                        "spiffe://domain.example.com/service/bob",
                        "acl/frank.yaml",
+                       PermissionRead,
                        false,
                },
        }
 
        for i, tc := range testCases {
-               assert.Equal(tc.expect, acl.Check(tc.princ, tc.key),
+               assert.Equal(tc.expect, acl.Check(tc.princ, tc.key, tc.op),
                        "test case %d: %s", i, tc.description)
        }
 }
index 59eb0bf4af1755e9902e29ffc2f84ebe250a852b..9c6f3343ff4d4aa9ac53b9fb12e6d9541dbf070f 100644 (file)
@@ -8,7 +8,6 @@ package servicer
 
 import (
        "bytes"
-       "context"
        "errors"
        "io"
 
@@ -24,6 +23,7 @@ import (
        "go.fuhry.dev/runtime/mtls"
        "go.fuhry.dev/runtime/mtls/fsnotify"
        ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
+       "go.fuhry.dev/runtime/utils/context"
        "go.fuhry.dev/runtime/utils/log"
        "go.fuhry.dev/runtime/utils/option"
 )
@@ -34,6 +34,19 @@ type msgWithPath interface {
        GetPath() string
 }
 
+type EphsServer interface {
+       ephs_proto.EphsServer
+
+       // InstallClient creates a client that implements the ephs.Client interface,
+       // but using the low-level client directly instead of the gRPC client. The
+       // default ephs client is overridden to use this implementation.
+       //
+       // This is used to solve the circular dependency of "to use ephs, ephs must
+       // be available" - allowing functionality like retrieving ACLs from ephs even
+       // if there are no currently healthy ephs servers.
+       InstallClient()
+}
+
 type ephsServicer struct {
        ephs_proto.EphsServer
 
@@ -98,7 +111,7 @@ func WithAWSCredentialFile(filename string) Option {
        })
 }
 
-func NewEphsServicer(opts ...Option) (ephs_proto.EphsServer, error) {
+func NewEphsServicer(opts ...Option) (EphsServer, error) {
        serv := &ephsServicer{
                logger: log.Default().WithPrefix("ephsServicer"),
        }
@@ -118,7 +131,12 @@ func NewEphsServicer(opts ...Option) (ephs_proto.EphsServer, error) {
        return serv, nil
 }
 
-func (s *ephsServicer) checkPath(ctx context.Context, p string) (ephs.Path, error) {
+func (s *ephsServicer) InstallClient() {
+       fec := s.ll.HighLevelClient()
+       ephs.OverrideDefaultClient(fec)
+}
+
+func (s *ephsServicer) checkPath(ctx context.Context, p string, opType PermissionBit) (ephs.Path, error) {
        ephsPath, err := ephs.ParsePath(p)
        if err != nil {
                return nil, status.Errorf(
@@ -133,7 +151,7 @@ func (s *ephsServicer) checkPath(ctx context.Context, p string) (ephs.Path, erro
        }
 
        if s.acl != nil {
-               if !s.acl.Check(ident, ephsPath.AclPath()) {
+               if !s.acl.Check(ident, ephsPath.AclPath(), opType) {
                        return nil, status.Errorf(
                                codes.PermissionDenied,
                                "access to path %q denied for %q (rule path: %q)",
@@ -146,9 +164,18 @@ func (s *ephsServicer) checkPath(ctx context.Context, p string) (ephs.Path, erro
        return ephsPath, nil
 }
 
+func (s *ephsServicer) logRequest(ctx context.Context, method string, req msgWithPath) {
+       id, err := identityFromContext(ctx)
+       if err != nil {
+               return
+       }
+
+       s.logger.AppendPrefix("]["+id).Infof("%s(%q)", method, req.GetPath())
+}
+
 func (s *ephsServicer) Stat(ctx context.Context, req *ephs_proto.GetRequest) (*ephs_proto.StatResponse, error) {
        s.logRequest(ctx, "Stat", req)
-       ephsPath, err := s.checkPath(ctx, req.Path)
+       ephsPath, err := s.checkPath(ctx, req.Path, PermissionRead)
        if err != nil {
                return nil, err
        }
@@ -163,20 +190,11 @@ func (s *ephsServicer) Stat(ctx context.Context, req *ephs_proto.GetRequest) (*e
        }, nil
 }
 
-func (s *ephsServicer) logRequest(ctx context.Context, method string, req msgWithPath) {
-       id, err := identityFromContext(ctx)
-       if err != nil {
-               return
-       }
-
-       s.logger.AppendPrefix("]["+id).Infof("%s(%q)", method, req.GetPath())
-}
-
 func (s *ephsServicer) Get(req *ephs_proto.GetRequest, server ephs_proto.Ephs_GetServer) error {
        ctx := server.Context()
        s.logRequest(ctx, "Get", req)
 
-       ephsPath, err := s.checkPath(ctx, req.Path)
+       ephsPath, err := s.checkPath(ctx, req.Path, PermissionRead)
        if err != nil {
                return err
        }
@@ -184,7 +202,7 @@ func (s *ephsServicer) Get(req *ephs_proto.GetRequest, server ephs_proto.Ephs_Ge
        entry, reader, err := s.ll.Get(ctx, ephsPath)
        if err != nil {
                if errors.Is(err, ephsll.ErrDirectory) {
-                       return status.Errorf(
+                       err = status.Errorf(
                                codes.FailedPrecondition,
                                "%q: is a directory",
                                req.Path)
@@ -213,24 +231,26 @@ func (s *ephsServicer) Get(req *ephs_proto.GetRequest, server ephs_proto.Ephs_Ge
 }
 
 func (s *ephsServicer) MkDir(ctx context.Context, req *ephs_proto.MkDirRequest) (*emptypb.Empty, error) {
-       path, err := s.checkPath(ctx, req.Path)
+       path, err := s.checkPath(ctx, req.Path, PermissionWrite)
        if err != nil {
                return nil, err
        }
 
        whoami, _ := identityFromContext(ctx)
 
-       return &emptypb.Empty{}, s.ll.Mkdir(ctx, path, whoami, req.Recursive)
+       err = s.ll.Mkdir(ctx, path, whoami, req.Recursive)
+       return &emptypb.Empty{}, err
 }
 
 func (s *ephsServicer) Delete(ctx context.Context, req *ephs_proto.DeleteRequest) (*emptypb.Empty, error) {
        s.logRequest(ctx, "Delete", req)
-       ephsPath, err := s.checkPath(ctx, req.Path)
+       ephsPath, err := s.checkPath(ctx, req.Path, PermissionWrite)
        if err != nil {
                return nil, err
        }
 
-       if err := s.ll.Delete(ctx, ephsPath, req.Recursive); err != nil {
+       err = s.ll.Delete(ctx, ephsPath, req.Recursive)
+       if err != nil {
                return nil, err
        }
 
@@ -251,7 +271,7 @@ func (s *ephsServicer) Put(server ephs_proto.Ephs_PutServer) error {
                return err
        }
 
-       ephsPath, err := s.checkPath(ctx, msg.Path)
+       ephsPath, err := s.checkPath(ctx, msg.Path, PermissionWrite)
        if err != nil {
                return err
        }
@@ -267,13 +287,14 @@ func (s *ephsServicer) Put(server ephs_proto.Ephs_PutServer) error {
        response := &ephs_proto.StatResponse{
                Entry: ent,
        }
-       return server.SendAndClose(response)
+       err = server.SendAndClose(response)
+       return err
 }
 
 func (s *ephsServicer) Watch(req *ephs_proto.GetRequest, server ephs_proto.Ephs_WatchServer) error {
        ctx := server.Context()
        s.logRequest(ctx, "Watch", req)
-       ephsPath, err := s.checkPath(ctx, req.Path)
+       ephsPath, err := s.checkPath(ctx, req.Path, PermissionRead)
        if err != nil {
                return err
        }