From: Dan Fuhry Date: Sat, 14 Mar 2026 23:14:05 +0000 (-0400) Subject: [ephs] split out low level client, add r/w permissions X-Git-Url: https://go.fuhry.dev/?a=commitdiff_plain;h=34adbb4dad781821ad38b2a44a28e36508ca6b4d;p=runtime.git [ephs] split out low level client, add r/w permissions ephsll contains the etcd interaction parts of ephs, and includes an `ephs.Client` implementation which can replace the default client for a process that has unrestricted access to the etcd prefix. ACLs for ephs now also gain mode bits for read and write access separately, allowing subtrees or individual paths to be world-readable but writable only by the owner. --- diff --git a/cmd/ephs_client/BUILD.bazel b/cmd/ephs_client/BUILD.bazel index 906286d..1f4550e 100644 --- a/cmd/ephs_client/BUILD.bazel +++ b/cmd/ephs_client/BUILD.bazel @@ -8,6 +8,8 @@ go_library( deps = [ "//constants", "//ephs", + "//ephs/ephsll", + "//mtls", "//utils/context", "//utils/fsutil", "//utils/log", diff --git a/cmd/ephs_client/main.go b/cmd/ephs_client/main.go index 93b4f6d..c90f904 100644 --- a/cmd/ephs_client/main.go +++ b/cmd/ephs_client/main.go @@ -16,13 +16,31 @@ import ( "go.fuhry.dev/runtime/constants" "go.fuhry.dev/runtime/ephs" + "go.fuhry.dev/runtime/ephs/ephsll" + "go.fuhry.dev/runtime/mtls" "go.fuhry.dev/runtime/utils/context" "go.fuhry.dev/runtime/utils/fsutil" "go.fuhry.dev/runtime/utils/log" ) +func ephsClient(cmd *cli.Command) (ephs.Client, error) { + /* + if !cmd.Bool("ll") { + return ephs.DefaultClient() + } + */ + + identity := mtls.DefaultIdentity() + ll, err := ephsll.NewEphsLowLevelClient(identity) + if err != nil { + return nil, err + } + + return ll.HighLevelClient(), nil +} + func cmdStat(ctx context.Context, cmd *cli.Command) error { - client, err := ephs.DefaultClient() + client, err := ephsClient(cmd) if err != nil { return err } @@ -37,7 +55,7 @@ func cmdStat(ctx context.Context, cmd *cli.Command) error { } func cmdCat(ctx context.Context, cmd *cli.Command) error { - client, err := ephs.DefaultClient() + client, err := ephsClient(cmd) if err != nil { return err } @@ -53,7 +71,7 @@ func cmdCat(ctx context.Context, cmd *cli.Command) error { } func cmdCopy(ctx context.Context, cmd *cli.Command) error { - client, err := ephs.DefaultClient() + client, err := ephsClient(cmd) if err != nil { return err } @@ -112,7 +130,7 @@ func cmdCopy(ctx context.Context, cmd *cli.Command) error { } func cmdDelete(ctx context.Context, cmd *cli.Command) error { - client, err := ephs.DefaultClient() + client, err := ephsClient(cmd) if err != nil { return err } @@ -127,7 +145,7 @@ func cmdDelete(ctx context.Context, cmd *cli.Command) error { } func cmdWatch(ctx context.Context, cmd *cli.Command) error { - client, err := ephs.DefaultClient() + client, err := ephsClient(cmd) if err != nil { return err } @@ -144,7 +162,7 @@ func cmdWatch(ctx context.Context, cmd *cli.Command) error { } func cmdMkdir(ctx context.Context, cmd *cli.Command) error { - client, err := ephs.DefaultClient() + client, err := ephsClient(cmd) if err != nil { return err } @@ -159,7 +177,7 @@ func cmdMkdir(ctx context.Context, cmd *cli.Command) error { } func cmdEdit(ctx context.Context, cmd *cli.Command) error { - client, err := ephs.DefaultClient() + client, err := ephsClient(cmd) if err != nil { return err } @@ -259,12 +277,17 @@ func main() { ctx, cancel := context.Interruptible() defer cancel() - flag.Parse() - + exe, _ := os.Executable() cmd := &cli.Command{ - Name: "ephs", - Version: constants.Version, - Description: "interact with ephs", + Name: path.Base(exe), + Version: constants.Version, + Description: "interact with ephs", + AllowExtFlags: true, + EnableShellCompletion: true, + Before: func(ctx context.Context, c *cli.Command) (context.Context, error) { + flag.CommandLine.Parse([]string{}) + return ctx, nil + }, Commands: []*cli.Command{ { @@ -364,25 +387,15 @@ func main() { }, Flags: []cli.Flag{ - &cli.IntFlag{ - Name: "vv", - Usage: "verbosity level", - }, - &cli.StringFlag{ - Name: "v", - Usage: "log level", - }, - &cli.StringFlag{ - Name: "grpc.transport", - Usage: "grpc transport (tcp or quic)", + &cli.BoolFlag{ + Name: "ll", + Aliases: []string{"low-level"}, + Usage: "use low-level client, communicating directly with etcd instead of looking for an ephs server", }, }, } - args := os.Args[:1] - args = append(args, flag.Args()...) - - if err := cmd.Run(ctx, args); err != nil { + if err := cmd.Run(ctx, os.Args); err != nil { log.Fatal(err) os.Exit(1) } diff --git a/cmd/ephs_server/main.go b/cmd/ephs_server/main.go index 79685cc..c4bb9ed 100644 --- a/cmd/ephs_server/main.go +++ b/cmd/ephs_server/main.go @@ -26,11 +26,6 @@ func main() { flag.Parse() serverIdentity := mtls.DefaultIdentity() - s, err := grpc.NewGrpcServer(serverIdentity, grpc.WithTransport(&grpc.QUICConnectionFactory{})) - if err != nil { - log.Fatal(err) - os.Exit(1) - } var opts []servicer.Option if *acl != "" { @@ -49,6 +44,15 @@ func main() { os.Exit(1) } + serv.InstallClient() + + s, err := grpc.NewGrpcServer(serverIdentity, + grpc.WithTransport(&grpc.QUICConnectionFactory{})) + if err != nil { + log.Fatal(err) + os.Exit(1) + } + ctx, cancel := context.Interruptible() defer cancel() @@ -63,7 +67,3 @@ func main() { <-ctx.Done() } - -func init() { - mtls.SetDefaultIdentity("ephs") -} diff --git a/ephs/client.go b/ephs/client.go index 5a1943d..c2006c5 100644 --- a/ephs/client.go +++ b/ephs/client.go @@ -78,6 +78,8 @@ type getReader struct { cancel context.CancelFunc } +var logger = log.WithPrefix("ephs.Client") + var DefaultClientContext context.Context var ephsQuicConfig = &quic.Config{ @@ -206,7 +208,18 @@ func DefaultClient() (Client, error) { return defaultClient, nil } +func OverrideDefaultClient(c Client) { + defaultClientMu.Lock() + defer defaultClientMu.Unlock() + + defaultClient = c +} + func NewClient(ctx context.Context, localId mtls.Identity, opts ...ClientOption) (Client, error) { + if localId.Class() == mtls.AnonymousPrincipal { + return nil, fmt.Errorf("ephs is not available with anonymous authentication") + } + cl := &clientImpl{ defaultCtx: ctx, defaultTimeout: 15 * time.Second, @@ -448,16 +461,16 @@ func (c *clientImpl) watch(origCtx context.Context, origPath string, stream ephs if rpc, err := c.grpcClient(); err == nil { stream, err = rpc.Watch(origCtx, &ephs_pb.GetRequest{Path: origPath}) if err == nil { - log.Default().Noticef("reconnected watcher with new stream") + logger.Noticef("reconnected watcher with new stream") continue } else { - log.Default().Errorf("error reestablishing watch stream: %v", err) + logger.Errorf("error reestablishing watch stream: %v", err) } } else { - log.Default().Errorf("error reconnecting: %v", err) + logger.Errorf("error reconnecting: %v", err) } } - log.Default().Errorf("error receiving watch stream: %v", err) + logger.Errorf("error receiving watch stream: %v", err) return } @@ -495,10 +508,10 @@ func (c *clientImpl) grpcClient() (ephs_pb.EphsClient, error) { if lastErr == nil { break } - log.Default().Warningf("error establishing grpc connection to ephs server, retrying in 1s: %T: %v", lastErr, lastErr) + logger.Warningf("error establishing grpc connection to ephs server, retrying in 1s: %T: %v", lastErr, lastErr) select { case <-c.defaultCtx.Done(): - log.Default().Error(c.defaultCtx.Err()) + logger.Error(c.defaultCtx.Err()) return nil, c.defaultCtx.Err() case <-time.NewTimer(1 * time.Second).C: } @@ -507,8 +520,8 @@ func (c *clientImpl) grpcClient() (ephs_pb.EphsClient, error) { return nil, lastErr } - confFsCl := ephs_pb.NewEphsClient(conn) - return confFsCl, nil + grpcCl := ephs_pb.NewEphsClient(conn) + return grpcCl, nil } func humanFilesize(s uint64) string { diff --git a/ephs/ephsll/BUILD.bazel b/ephs/ephsll/BUILD.bazel index 8601f28..3794d7e 100644 --- a/ephs/ephsll/BUILD.bazel +++ b/ephs/ephsll/BUILD.bazel @@ -1,8 +1,10 @@ -load("@rules_go//go:def.bzl", "go_library") +load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "ephsll", srcs = [ + "client.go", + "errors.go", "fs_object.go", "low_level_client.go", "s3.go", @@ -15,6 +17,7 @@ go_library( "//mtls", "//proto/service/ephs", "//sd", + "//utils/context", "//utils/log", "//utils/option", "@com_github_minio_minio_go_v7//:minio-go", @@ -27,3 +30,20 @@ go_library( "@org_golang_google_protobuf//types/known/timestamppb", ], ) + +go_test( + name = "ephsll_test", + srcs = [ + "all_test.go", + "low_level_client_test.go", + ], + embed = [":ephsll"], + deps = [ + "//ephs", + "//mtls", + "//testlibs/etcdtest", + "//utils/log", + "@in_gopkg_check_v1//:check_v1", + "@io_etcd_go_etcd_client_v3//:client", + ], +) diff --git a/ephs/ephsll/all_test.go b/ephs/ephsll/all_test.go new file mode 100644 index 0000000..09d5f7f --- /dev/null +++ b/ephs/ephsll/all_test.go @@ -0,0 +1,68 @@ +package ephsll + +import ( + "context" + "testing" + "time" + + etcd_client "go.etcd.io/etcd/client/v3" + "go.fuhry.dev/runtime/ephs" + "go.fuhry.dev/runtime/mtls" + "go.fuhry.dev/runtime/testlibs/etcdtest" + "go.fuhry.dev/runtime/utils/log" + . "gopkg.in/check.v1" +) + +const testCell = "ephs.test.invalid" + +type EphsLowLevelTestSuite struct { + etcd etcdtest.EtcdTestServer + llc EphsLowLevelClient + ctx context.Context + cancel context.CancelFunc + id mtls.Identity +} + +var s = &EphsLowLevelTestSuite{} +var _ = Suite(s) + +func Test(t *testing.T) { + TestingT(t) +} + +func (s *EphsLowLevelTestSuite) SetUpSuite(c *C) { + s.ctx, s.cancel = context.WithTimeout(context.Background(), 3*time.Second) + s.id = mtls.NewLazyIdentity(mtls.ServicePrincipal, "ephs") + + ephs.OverrideDefaultCell(testCell) + + ms, err := etcdtest.NewEtcdTestServer() + c.Assert(err, IsNil) + + s.etcd = ms + + llc := &ephsLowLevelClientImpl{ + id: s.id, + logger: log.WithPrefix("EphsLowLevelClient"), + clients: map[string]*etcd_client.Client{ + testCell: s.etcdClient(c), + }, + } + s.llc = llc +} + +func (s *EphsLowLevelTestSuite) TearDownSuite(c *C) { + if s.etcd != nil { + s.etcd.Close() + } + s.cancel() +} + +func (s *EphsLowLevelTestSuite) etcdClient(c *C) *etcd_client.Client { + c.Assert(s.etcd, NotNil) + + client, err := s.etcd.Client() + c.Assert(err, IsNil) + + return client +} diff --git a/ephs/ephsll/client.go b/ephs/ephsll/client.go new file mode 100644 index 0000000..5d8d355 --- /dev/null +++ b/ephs/ephsll/client.go @@ -0,0 +1,142 @@ +package ephsll + +import ( + "context" + "io" + + "go.fuhry.dev/runtime/ephs" + "go.fuhry.dev/runtime/mtls" + ephs_proto "go.fuhry.dev/runtime/proto/service/ephs" +) + +type ephsllFrontendClient struct { + baseCtx context.Context + ll EphsLowLevelClient + localId mtls.Identity +} + +var _ ephs.Client = &ephsllFrontendClient{} + +func newFrontendClient(baseCtx context.Context, ll EphsLowLevelClient, localId mtls.Identity) ephs.Client { + return &ephsllFrontendClient{ + baseCtx: baseCtx, + ll: ll, + localId: localId, + } +} + +// Get retrieves the content of a file at the specified path +func (c *ephsllFrontendClient) Get(path string) (io.Reader, error) { + return c.GetContext(c.baseCtx, path) +} + +// GetContext retrieves the content of a file at the specified path with context +func (c *ephsllFrontendClient) GetContext(ctx context.Context, path string) (io.ReadCloser, error) { + ephsPath, err := ephs.ParsePath(path) + if err != nil { + return nil, err + } + + entry, reader, err := c.ll.Get(ctx, ephsPath) + if err != nil { + return nil, err + } + + // Return a wrapper that properly handles closing + return &ephsReadCloser{ + reader: reader, + entry: entry, + }, nil +} + +// Stat retrieves metadata about a file or directory at the specified path +func (c *ephsllFrontendClient) Stat(path string) (*ephs_proto.FsEntry, error) { + return c.StatContext(c.baseCtx, path) +} + +// StatContext retrieves metadata about a file or directory at the specified path with context +func (c *ephsllFrontendClient) StatContext(ctx context.Context, path string) (*ephs_proto.FsEntry, error) { + ephsPath, err := ephs.ParsePath(path) + if err != nil { + return nil, err + } + + return c.ll.Stat(ctx, ephsPath) +} + +// Put writes content to a file at the specified path +func (c *ephsllFrontendClient) Put(path string, size uint64, content io.Reader) (*ephs_proto.FsEntry, error) { + return c.PutContext(c.baseCtx, path, size, content) +} + +// PutContext writes content to a file at the specified path with context +func (c *ephsllFrontendClient) PutContext(ctx context.Context, path string, size uint64, content io.Reader) (*ephs_proto.FsEntry, error) { + ephsPath, err := ephs.ParsePath(path) + if err != nil { + return nil, err + } + + return c.ll.Put(ctx, ephsPath, c.localId.Name(), 0, size, content) +} + +// Delete removes a file or directory at the specified path +func (c *ephsllFrontendClient) Delete(path string, recursive bool) error { + return c.DeleteContext(c.baseCtx, path, recursive) +} + +// DeleteContext removes a file or directory at the specified path with context +func (c *ephsllFrontendClient) DeleteContext(ctx context.Context, path string, recursive bool) error { + ephsPath, err := ephs.ParsePath(path) + if err != nil { + return err + } + + return c.ll.Delete(ctx, ephsPath, recursive) +} + +// MkDir creates a directory at the specified path +func (c *ephsllFrontendClient) MkDir(path string, recursive bool) error { + return c.MkDirContext(c.baseCtx, path, recursive) +} + +// MkDirContext creates a directory at the specified path with context +func (c *ephsllFrontendClient) MkDirContext(ctx context.Context, path string, recursive bool) error { + ephsPath, err := ephs.ParsePath(path) + if err != nil { + return err + } + + return c.ll.Mkdir(ctx, ephsPath, c.localId.Name(), recursive) +} + +// Watch watches for changes to a file or directory at the specified path +func (c *ephsllFrontendClient) Watch(ctx context.Context, path string) (<-chan *ephs_proto.WatchResponse, error) { + ephsPath, err := ephs.ParsePath(path) + if err != nil { + return nil, err + } + + return c.ll.Watch(ctx, ephsPath) +} + +// ephsReadCloser wraps an io.Reader to provide proper Close behavior +type ephsReadCloser struct { + reader io.Reader + entry *ephs_proto.FsEntry + closed bool +} + +func (r *ephsReadCloser) Read(p []byte) (int, error) { + if r.closed { + return 0, io.EOF + } + return r.reader.Read(p) +} + +func (r *ephsReadCloser) Close() error { + if rc, ok := r.reader.(io.ReadCloser); ok { + r.closed = true + return rc.Close() + } + return nil +} diff --git a/ephs/ephsll/errors.go b/ephs/ephsll/errors.go new file mode 100644 index 0000000..e10ffdf --- /dev/null +++ b/ephs/ephsll/errors.go @@ -0,0 +1,23 @@ +package ephsll + +import ( + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func IsAlreadyExists(err error) bool { + return errIsCode(err, codes.AlreadyExists) +} + +func errIsCode(err error, code codes.Code) bool { + if err == nil { + return false + } + + if st, ok := status.FromError(err); ok { + if st.Code() == code { + return true + } + } + return false +} diff --git a/ephs/ephsll/low_level_client.go b/ephs/ephsll/low_level_client.go index 8fd085f..f67b26a 100644 --- a/ephs/ephsll/low_level_client.go +++ b/ephs/ephsll/low_level_client.go @@ -5,7 +5,6 @@ package ephsll import ( - "context" "fmt" "io" "slices" @@ -24,6 +23,7 @@ import ( "go.fuhry.dev/runtime/mtls" ephs_proto "go.fuhry.dev/runtime/proto/service/ephs" "go.fuhry.dev/runtime/sd" + "go.fuhry.dev/runtime/utils/context" "go.fuhry.dev/runtime/utils/log" "go.fuhry.dev/runtime/utils/option" ) @@ -31,6 +31,7 @@ import ( type Option = option.Option[*ephsLowLevelClientImpl] type EphsLowLevelClient interface { + HighLevelClient() ephs.Client Stat(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error) Get(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, io.Reader, error) Mkdir(ctx context.Context, path ephs.Path, owner string, recursive bool) error @@ -76,6 +77,12 @@ func NewEphsLowLevelClient(id mtls.Identity, opts ...Option) (EphsLowLevelClient return ll, nil } +func (c *ephsLowLevelClientImpl) HighLevelClient() ephs.Client { + ctx, _ := context.Interruptible() + + return newFrontendClient(ctx, c, c.id) +} + // Stat func (c *ephsLowLevelClientImpl) Stat(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error) { ent, err := c.getProtoForPath(ctx, path) diff --git a/ephs/ephsll/low_level_client_test.go b/ephs/ephsll/low_level_client_test.go new file mode 100644 index 0000000..503887e --- /dev/null +++ b/ephs/ephsll/low_level_client_test.go @@ -0,0 +1,59 @@ +package ephsll + +import ( + "bytes" + "io" + + "go.fuhry.dev/runtime/ephs" + . "gopkg.in/check.v1" +) + +func (s *EphsLowLevelTestSuite) TestMkdir(c *C) { + path, err := ephs.ParsePath("/ephs/local/test_dir") + c.Assert(err, IsNil) + + err = s.llc.Mkdir(s.ctx, path, s.id.Name(), true) + c.Assert(err, IsNil) +} + +func (s *EphsLowLevelTestSuite) TestPut(c *C) { + path, err := ephs.ParsePath("/ephs/local/test_put") + c.Assert(err, IsNil) + + err = s.llc.Mkdir(s.ctx, path.Parent(), s.id.Name(), true) + if err != nil { + c.Assert(IsAlreadyExists(err), Equals, true) + } + + contents := bytes.NewBufferString("test ephs file contents") + fse, err := s.llc.Put(s.ctx, path, s.id.Name(), 0, uint64(contents.Len()), contents) + c.Assert(err, IsNil) + c.Assert(fse.GetContent().GetFile(), NotNil) +} + +func (s *EphsLowLevelTestSuite) TestGet(c *C) { + path, err := ephs.ParsePath("/ephs/local/test_get") + c.Assert(err, IsNil) + + err = s.llc.Mkdir(s.ctx, path.Parent(), s.id.Name(), true) + if err != nil { + c.Assert(IsAlreadyExists(err), Equals, true) + } + + contents := []byte("test ephs file contents") + size := uint64(len(contents)) + _, err = s.llc.Put(s.ctx, path, s.id.Name(), 1, size, bytes.NewBuffer(contents)) + c.Assert(err, IsNil) + + fse, reader, err := s.llc.Get(s.ctx, path) + c.Assert(err, IsNil) + c.Assert(fse.GetVersion(), Equals, uint64(1)) + c.Assert(fse.GetContent(), NotNil) + c.Assert(fse.GetContent().GetFile(), NotNil) + c.Assert(fse.GetSize(), Equals, size) + + readContents, err := io.ReadAll(reader) + c.Assert(err, IsNil) + + c.Assert(readContents, DeepEquals, contents) +} diff --git a/ephs/path.go b/ephs/path.go index 3cf3058..7562a93 100644 --- a/ephs/path.go +++ b/ephs/path.go @@ -40,6 +40,10 @@ func DefaultCell() string { return ephsDefaultCell } +func OverrideDefaultCell(cell string) { + ephsDefaultCell = cell +} + func IsEphsPath(p string) bool { if !strings.HasPrefix(p, KeyPrefix) { return false diff --git a/ephs/servicer/BUILD.bazel b/ephs/servicer/BUILD.bazel index 17213fb..3b7481a 100644 --- a/ephs/servicer/BUILD.bazel +++ b/ephs/servicer/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//mtls", "//mtls/fsnotify", "//proto/service/ephs", + "//utils/context", "//utils/log", "//utils/option", "//utils/stringmatch", diff --git a/ephs/servicer/acl.go b/ephs/servicer/acl.go index 5591e7c..5bac2b0 100644 --- a/ephs/servicer/acl.go +++ b/ephs/servicer/acl.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "os" + "slices" "gopkg.in/yaml.v3" @@ -13,15 +14,43 @@ import ( ) type AclRule struct { - Principal *stringmatch.NewSyntaxMatchRule `yaml:"principal"` - Key *stringmatch.NewSyntaxMatchRule `yaml:"key"` - Invert bool `yaml:"invert"` + Principal *stringmatch.NewSyntaxMatchRule `yaml:"principal"` + Key *stringmatch.NewSyntaxMatchRule `yaml:"key"` + Permissions *PermissionString `yaml:"perm"` + Invert bool `yaml:"invert"` } type Acl struct { Rules []*AclRule `yaml:"rules"` } +type PermissionString string +type PermissionBit byte + +const ( + PermissionRead PermissionBit = 'r' + PermissionWrite PermissionBit = 'w' +) + +var permissionBytes = []byte{ + byte(PermissionRead), + byte(PermissionWrite), +} + +func (p *PermissionString) UnmarshalYAML(node *yaml.Node) error { + for _, b := range []byte(node.Value) { + if !slices.Contains(permissionBytes, b) { + return fmt.Errorf("unknown permission bit: %s", string(b)) + } + } + *p = PermissionString(node.Value) + return nil +} + +func (p *PermissionString) Has(b PermissionBit) bool { + return slices.Contains([]PermissionBit(*p), b) +} + func LoadAcl(filePath string) (*Acl, error) { fp, err := os.OpenFile(filePath, os.O_RDONLY, os.FileMode(0)) if err != nil { @@ -57,7 +86,7 @@ func LoadAclString(text string) (*Acl, error) { return acl, nil } -func (r *AclRule) Match(principal, key string) bool { +func (r *AclRule) Match(principal, key string, op PermissionBit) bool { if !r.Principal.Match(principal) { return false } @@ -77,12 +106,20 @@ func (r *AclRule) Match(principal, key string) bool { keyMatcher = keyMatcher.Sub(vars) - return keyMatcher.Match(key) + if !keyMatcher.Match(key) { + return false + } + + if r.Permissions != nil && !r.Permissions.Has(op) { + return false + } + + return true } -func (a *Acl) Check(principal, key string) bool { +func (a *Acl) Check(principal, key string, op PermissionBit) bool { for _, r := range a.Rules { - if r.Match(principal, key) { + if r.Match(principal, key, op) { return !r.Invert } } diff --git a/ephs/servicer/acl_test.go b/ephs/servicer/acl_test.go index 6dba593..a70dc0c 100644 --- a/ephs/servicer/acl_test.go +++ b/ephs/servicer/acl_test.go @@ -12,6 +12,7 @@ rules: exact: spiffe://domain.example.com/user/bob key: exact: "{{name}}/can/write/this" + perm: rw - principal: any: true key: @@ -22,10 +23,12 @@ rules: - exact: spiffe://domain.example.com/user/suzie key: prefix: or/test/{{name}}/ + perm: r - principal: prefix: spiffe://domain.example.com/service/ key: exact: acl/{{name}}.yaml + perm: r ` func TestAcl(t *testing.T) { @@ -33,6 +36,7 @@ func TestAcl(t *testing.T) { type testCase struct { description, princ, key string + op PermissionBit expect bool } @@ -44,81 +48,107 @@ func TestAcl(t *testing.T) { testCases := []testCase{ { - "bob has access to bob/can/write/this", + "bob has read access to bob/can/write/this", "spiffe://domain.example.com/user/bob", "bob/can/write/this", + PermissionRead, + true, + }, + { + "bob has write access to bob/can/write/this", + "spiffe://domain.example.com/user/bob", + "bob/can/write/this", + PermissionWrite, true, }, { "bob does not have access to suzie/can/write/this", "spiffe://domain.example.com/user/bob", "suzie/can/write/this", + PermissionRead, false, }, { "suzie does not have access to suzie/can/write/this", "spiffe://domain.example.com/user/suzie", "suzie/can/write/this", + PermissionRead, false, }, { "bob has access to writable/by/self/bob", "spiffe://domain.example.com/user/bob", "writable/by/self/bob", + PermissionRead, true, }, { "suzie has access to writable/by/self/suzie", "spiffe://domain.example.com/user/suzie", "writable/by/self/suzie", + PermissionRead, true, }, { "bob has access to writable/by/self/suzie", "spiffe://domain.example.com/user/suzie", "writable/by/self/suzie", + PermissionRead, true, }, { "bob has access to or/test/bob/foo", "spiffe://domain.example.com/user/bob", "or/test/bob/foo", + PermissionRead, true, }, { "suzie has access to or/test/suzie/foo", "spiffe://domain.example.com/user/suzie", "or/test/suzie/foo", + PermissionRead, true, }, { "frank does not have access to or/test/frank/foo", "spiffe://domain.example.com/user/frank", "or/test/frank/foo", + PermissionRead, false, }, { "bob does not have access no/rule/for/this", "spiffe://domain.example.com/user/bob", "no/rule/for/this", + PermissionRead, false, }, { - "bob has access to acl/bob.yaml", + "bob has read access to acl/bob.yaml", "spiffe://domain.example.com/service/bob", "acl/bob.yaml", + PermissionRead, true, }, + { + "bob does not have write access to acl/bob.yaml", + "spiffe://domain.example.com/service/bob", + "acl/bob.yaml", + PermissionWrite, + false, + }, { "bob has no access to acl/frank.yaml", "spiffe://domain.example.com/service/bob", "acl/frank.yaml", + PermissionRead, false, }, } for i, tc := range testCases { - assert.Equal(tc.expect, acl.Check(tc.princ, tc.key), + assert.Equal(tc.expect, acl.Check(tc.princ, tc.key, tc.op), "test case %d: %s", i, tc.description) } } diff --git a/ephs/servicer/servicer.go b/ephs/servicer/servicer.go index 59eb0bf..9c6f334 100644 --- a/ephs/servicer/servicer.go +++ b/ephs/servicer/servicer.go @@ -8,7 +8,6 @@ package servicer import ( "bytes" - "context" "errors" "io" @@ -24,6 +23,7 @@ import ( "go.fuhry.dev/runtime/mtls" "go.fuhry.dev/runtime/mtls/fsnotify" ephs_proto "go.fuhry.dev/runtime/proto/service/ephs" + "go.fuhry.dev/runtime/utils/context" "go.fuhry.dev/runtime/utils/log" "go.fuhry.dev/runtime/utils/option" ) @@ -34,6 +34,19 @@ type msgWithPath interface { GetPath() string } +type EphsServer interface { + ephs_proto.EphsServer + + // InstallClient creates a client that implements the ephs.Client interface, + // but using the low-level client directly instead of the gRPC client. The + // default ephs client is overridden to use this implementation. + // + // This is used to solve the circular dependency of "to use ephs, ephs must + // be available" - allowing functionality like retrieving ACLs from ephs even + // if there are no currently healthy ephs servers. + InstallClient() +} + type ephsServicer struct { ephs_proto.EphsServer @@ -98,7 +111,7 @@ func WithAWSCredentialFile(filename string) Option { }) } -func NewEphsServicer(opts ...Option) (ephs_proto.EphsServer, error) { +func NewEphsServicer(opts ...Option) (EphsServer, error) { serv := &ephsServicer{ logger: log.Default().WithPrefix("ephsServicer"), } @@ -118,7 +131,12 @@ func NewEphsServicer(opts ...Option) (ephs_proto.EphsServer, error) { return serv, nil } -func (s *ephsServicer) checkPath(ctx context.Context, p string) (ephs.Path, error) { +func (s *ephsServicer) InstallClient() { + fec := s.ll.HighLevelClient() + ephs.OverrideDefaultClient(fec) +} + +func (s *ephsServicer) checkPath(ctx context.Context, p string, opType PermissionBit) (ephs.Path, error) { ephsPath, err := ephs.ParsePath(p) if err != nil { return nil, status.Errorf( @@ -133,7 +151,7 @@ func (s *ephsServicer) checkPath(ctx context.Context, p string) (ephs.Path, erro } if s.acl != nil { - if !s.acl.Check(ident, ephsPath.AclPath()) { + if !s.acl.Check(ident, ephsPath.AclPath(), opType) { return nil, status.Errorf( codes.PermissionDenied, "access to path %q denied for %q (rule path: %q)", @@ -146,9 +164,18 @@ func (s *ephsServicer) checkPath(ctx context.Context, p string) (ephs.Path, erro return ephsPath, nil } +func (s *ephsServicer) logRequest(ctx context.Context, method string, req msgWithPath) { + id, err := identityFromContext(ctx) + if err != nil { + return + } + + s.logger.AppendPrefix("]["+id).Infof("%s(%q)", method, req.GetPath()) +} + func (s *ephsServicer) Stat(ctx context.Context, req *ephs_proto.GetRequest) (*ephs_proto.StatResponse, error) { s.logRequest(ctx, "Stat", req) - ephsPath, err := s.checkPath(ctx, req.Path) + ephsPath, err := s.checkPath(ctx, req.Path, PermissionRead) if err != nil { return nil, err } @@ -163,20 +190,11 @@ func (s *ephsServicer) Stat(ctx context.Context, req *ephs_proto.GetRequest) (*e }, nil } -func (s *ephsServicer) logRequest(ctx context.Context, method string, req msgWithPath) { - id, err := identityFromContext(ctx) - if err != nil { - return - } - - s.logger.AppendPrefix("]["+id).Infof("%s(%q)", method, req.GetPath()) -} - func (s *ephsServicer) Get(req *ephs_proto.GetRequest, server ephs_proto.Ephs_GetServer) error { ctx := server.Context() s.logRequest(ctx, "Get", req) - ephsPath, err := s.checkPath(ctx, req.Path) + ephsPath, err := s.checkPath(ctx, req.Path, PermissionRead) if err != nil { return err } @@ -184,7 +202,7 @@ func (s *ephsServicer) Get(req *ephs_proto.GetRequest, server ephs_proto.Ephs_Ge entry, reader, err := s.ll.Get(ctx, ephsPath) if err != nil { if errors.Is(err, ephsll.ErrDirectory) { - return status.Errorf( + err = status.Errorf( codes.FailedPrecondition, "%q: is a directory", req.Path) @@ -213,24 +231,26 @@ func (s *ephsServicer) Get(req *ephs_proto.GetRequest, server ephs_proto.Ephs_Ge } func (s *ephsServicer) MkDir(ctx context.Context, req *ephs_proto.MkDirRequest) (*emptypb.Empty, error) { - path, err := s.checkPath(ctx, req.Path) + path, err := s.checkPath(ctx, req.Path, PermissionWrite) if err != nil { return nil, err } whoami, _ := identityFromContext(ctx) - return &emptypb.Empty{}, s.ll.Mkdir(ctx, path, whoami, req.Recursive) + err = s.ll.Mkdir(ctx, path, whoami, req.Recursive) + return &emptypb.Empty{}, err } func (s *ephsServicer) Delete(ctx context.Context, req *ephs_proto.DeleteRequest) (*emptypb.Empty, error) { s.logRequest(ctx, "Delete", req) - ephsPath, err := s.checkPath(ctx, req.Path) + ephsPath, err := s.checkPath(ctx, req.Path, PermissionWrite) if err != nil { return nil, err } - if err := s.ll.Delete(ctx, ephsPath, req.Recursive); err != nil { + err = s.ll.Delete(ctx, ephsPath, req.Recursive) + if err != nil { return nil, err } @@ -251,7 +271,7 @@ func (s *ephsServicer) Put(server ephs_proto.Ephs_PutServer) error { return err } - ephsPath, err := s.checkPath(ctx, msg.Path) + ephsPath, err := s.checkPath(ctx, msg.Path, PermissionWrite) if err != nil { return err } @@ -267,13 +287,14 @@ func (s *ephsServicer) Put(server ephs_proto.Ephs_PutServer) error { response := &ephs_proto.StatResponse{ Entry: ent, } - return server.SendAndClose(response) + err = server.SendAndClose(response) + return err } func (s *ephsServicer) Watch(req *ephs_proto.GetRequest, server ephs_proto.Ephs_WatchServer) error { ctx := server.Context() s.logRequest(ctx, "Watch", req) - ephsPath, err := s.checkPath(ctx, req.Path) + ephsPath, err := s.checkPath(ctx, req.Path, PermissionRead) if err != nil { return err }