ephsll contains the etcd interaction parts of ephs, and includes an `ephs.Client` implementation which can replace the default client for a process that has unrestricted access to the etcd prefix.
ACLs for ephs now also gain mode bits for read and write access separately, allowing subtrees or individual paths to be world-readable but writable only by the owner.
deps = [
"//constants",
"//ephs",
+ "//ephs/ephsll",
+ "//mtls",
"//utils/context",
"//utils/fsutil",
"//utils/log",
"go.fuhry.dev/runtime/constants"
"go.fuhry.dev/runtime/ephs"
+ "go.fuhry.dev/runtime/ephs/ephsll"
+ "go.fuhry.dev/runtime/mtls"
"go.fuhry.dev/runtime/utils/context"
"go.fuhry.dev/runtime/utils/fsutil"
"go.fuhry.dev/runtime/utils/log"
)
+func ephsClient(cmd *cli.Command) (ephs.Client, error) {
+ /*
+ if !cmd.Bool("ll") {
+ return ephs.DefaultClient()
+ }
+ */
+
+ identity := mtls.DefaultIdentity()
+ ll, err := ephsll.NewEphsLowLevelClient(identity)
+ if err != nil {
+ return nil, err
+ }
+
+ return ll.HighLevelClient(), nil
+}
+
func cmdStat(ctx context.Context, cmd *cli.Command) error {
- client, err := ephs.DefaultClient()
+ client, err := ephsClient(cmd)
if err != nil {
return err
}
}
func cmdCat(ctx context.Context, cmd *cli.Command) error {
- client, err := ephs.DefaultClient()
+ client, err := ephsClient(cmd)
if err != nil {
return err
}
}
func cmdCopy(ctx context.Context, cmd *cli.Command) error {
- client, err := ephs.DefaultClient()
+ client, err := ephsClient(cmd)
if err != nil {
return err
}
}
func cmdDelete(ctx context.Context, cmd *cli.Command) error {
- client, err := ephs.DefaultClient()
+ client, err := ephsClient(cmd)
if err != nil {
return err
}
}
func cmdWatch(ctx context.Context, cmd *cli.Command) error {
- client, err := ephs.DefaultClient()
+ client, err := ephsClient(cmd)
if err != nil {
return err
}
}
func cmdMkdir(ctx context.Context, cmd *cli.Command) error {
- client, err := ephs.DefaultClient()
+ client, err := ephsClient(cmd)
if err != nil {
return err
}
}
func cmdEdit(ctx context.Context, cmd *cli.Command) error {
- client, err := ephs.DefaultClient()
+ client, err := ephsClient(cmd)
if err != nil {
return err
}
ctx, cancel := context.Interruptible()
defer cancel()
- flag.Parse()
-
+ exe, _ := os.Executable()
cmd := &cli.Command{
- Name: "ephs",
- Version: constants.Version,
- Description: "interact with ephs",
+ Name: path.Base(exe),
+ Version: constants.Version,
+ Description: "interact with ephs",
+ AllowExtFlags: true,
+ EnableShellCompletion: true,
+ Before: func(ctx context.Context, c *cli.Command) (context.Context, error) {
+ flag.CommandLine.Parse([]string{})
+ return ctx, nil
+ },
Commands: []*cli.Command{
{
},
Flags: []cli.Flag{
- &cli.IntFlag{
- Name: "vv",
- Usage: "verbosity level",
- },
- &cli.StringFlag{
- Name: "v",
- Usage: "log level",
- },
- &cli.StringFlag{
- Name: "grpc.transport",
- Usage: "grpc transport (tcp or quic)",
+ &cli.BoolFlag{
+ Name: "ll",
+ Aliases: []string{"low-level"},
+ Usage: "use low-level client, communicating directly with etcd instead of looking for an ephs server",
},
},
}
- args := os.Args[:1]
- args = append(args, flag.Args()...)
-
- if err := cmd.Run(ctx, args); err != nil {
+ if err := cmd.Run(ctx, os.Args); err != nil {
log.Fatal(err)
os.Exit(1)
}
flag.Parse()
serverIdentity := mtls.DefaultIdentity()
- s, err := grpc.NewGrpcServer(serverIdentity, grpc.WithTransport(&grpc.QUICConnectionFactory{}))
- if err != nil {
- log.Fatal(err)
- os.Exit(1)
- }
var opts []servicer.Option
if *acl != "" {
os.Exit(1)
}
+ serv.InstallClient()
+
+ s, err := grpc.NewGrpcServer(serverIdentity,
+ grpc.WithTransport(&grpc.QUICConnectionFactory{}))
+ if err != nil {
+ log.Fatal(err)
+ os.Exit(1)
+ }
+
ctx, cancel := context.Interruptible()
defer cancel()
<-ctx.Done()
}
-
-func init() {
- mtls.SetDefaultIdentity("ephs")
-}
cancel context.CancelFunc
}
+var logger = log.WithPrefix("ephs.Client")
+
var DefaultClientContext context.Context
var ephsQuicConfig = &quic.Config{
return defaultClient, nil
}
+func OverrideDefaultClient(c Client) {
+ defaultClientMu.Lock()
+ defer defaultClientMu.Unlock()
+
+ defaultClient = c
+}
+
func NewClient(ctx context.Context, localId mtls.Identity, opts ...ClientOption) (Client, error) {
+ if localId.Class() == mtls.AnonymousPrincipal {
+ return nil, fmt.Errorf("ephs is not available with anonymous authentication")
+ }
+
cl := &clientImpl{
defaultCtx: ctx,
defaultTimeout: 15 * time.Second,
if rpc, err := c.grpcClient(); err == nil {
stream, err = rpc.Watch(origCtx, &ephs_pb.GetRequest{Path: origPath})
if err == nil {
- log.Default().Noticef("reconnected watcher with new stream")
+ logger.Noticef("reconnected watcher with new stream")
continue
} else {
- log.Default().Errorf("error reestablishing watch stream: %v", err)
+ logger.Errorf("error reestablishing watch stream: %v", err)
}
} else {
- log.Default().Errorf("error reconnecting: %v", err)
+ logger.Errorf("error reconnecting: %v", err)
}
}
- log.Default().Errorf("error receiving watch stream: %v", err)
+ logger.Errorf("error receiving watch stream: %v", err)
return
}
if lastErr == nil {
break
}
- log.Default().Warningf("error establishing grpc connection to ephs server, retrying in 1s: %T: %v", lastErr, lastErr)
+ logger.Warningf("error establishing grpc connection to ephs server, retrying in 1s: %T: %v", lastErr, lastErr)
select {
case <-c.defaultCtx.Done():
- log.Default().Error(c.defaultCtx.Err())
+ logger.Error(c.defaultCtx.Err())
return nil, c.defaultCtx.Err()
case <-time.NewTimer(1 * time.Second).C:
}
return nil, lastErr
}
- confFsCl := ephs_pb.NewEphsClient(conn)
- return confFsCl, nil
+ grpcCl := ephs_pb.NewEphsClient(conn)
+ return grpcCl, nil
}
func humanFilesize(s uint64) string {
-load("@rules_go//go:def.bzl", "go_library")
+load("@rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "ephsll",
srcs = [
+ "client.go",
+ "errors.go",
"fs_object.go",
"low_level_client.go",
"s3.go",
"//mtls",
"//proto/service/ephs",
"//sd",
+ "//utils/context",
"//utils/log",
"//utils/option",
"@com_github_minio_minio_go_v7//:minio-go",
"@org_golang_google_protobuf//types/known/timestamppb",
],
)
+
+go_test(
+ name = "ephsll_test",
+ srcs = [
+ "all_test.go",
+ "low_level_client_test.go",
+ ],
+ embed = [":ephsll"],
+ deps = [
+ "//ephs",
+ "//mtls",
+ "//testlibs/etcdtest",
+ "//utils/log",
+ "@in_gopkg_check_v1//:check_v1",
+ "@io_etcd_go_etcd_client_v3//:client",
+ ],
+)
--- /dev/null
+package ephsll
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ etcd_client "go.etcd.io/etcd/client/v3"
+ "go.fuhry.dev/runtime/ephs"
+ "go.fuhry.dev/runtime/mtls"
+ "go.fuhry.dev/runtime/testlibs/etcdtest"
+ "go.fuhry.dev/runtime/utils/log"
+ . "gopkg.in/check.v1"
+)
+
+const testCell = "ephs.test.invalid"
+
+type EphsLowLevelTestSuite struct {
+ etcd etcdtest.EtcdTestServer
+ llc EphsLowLevelClient
+ ctx context.Context
+ cancel context.CancelFunc
+ id mtls.Identity
+}
+
+var s = &EphsLowLevelTestSuite{}
+var _ = Suite(s)
+
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+func (s *EphsLowLevelTestSuite) SetUpSuite(c *C) {
+ s.ctx, s.cancel = context.WithTimeout(context.Background(), 3*time.Second)
+ s.id = mtls.NewLazyIdentity(mtls.ServicePrincipal, "ephs")
+
+ ephs.OverrideDefaultCell(testCell)
+
+ ms, err := etcdtest.NewEtcdTestServer()
+ c.Assert(err, IsNil)
+
+ s.etcd = ms
+
+ llc := &ephsLowLevelClientImpl{
+ id: s.id,
+ logger: log.WithPrefix("EphsLowLevelClient"),
+ clients: map[string]*etcd_client.Client{
+ testCell: s.etcdClient(c),
+ },
+ }
+ s.llc = llc
+}
+
+func (s *EphsLowLevelTestSuite) TearDownSuite(c *C) {
+ if s.etcd != nil {
+ s.etcd.Close()
+ }
+ s.cancel()
+}
+
+func (s *EphsLowLevelTestSuite) etcdClient(c *C) *etcd_client.Client {
+ c.Assert(s.etcd, NotNil)
+
+ client, err := s.etcd.Client()
+ c.Assert(err, IsNil)
+
+ return client
+}
--- /dev/null
+package ephsll
+
+import (
+ "context"
+ "io"
+
+ "go.fuhry.dev/runtime/ephs"
+ "go.fuhry.dev/runtime/mtls"
+ ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
+)
+
+type ephsllFrontendClient struct {
+ baseCtx context.Context
+ ll EphsLowLevelClient
+ localId mtls.Identity
+}
+
+var _ ephs.Client = &ephsllFrontendClient{}
+
+func newFrontendClient(baseCtx context.Context, ll EphsLowLevelClient, localId mtls.Identity) ephs.Client {
+ return &ephsllFrontendClient{
+ baseCtx: baseCtx,
+ ll: ll,
+ localId: localId,
+ }
+}
+
+// Get retrieves the content of a file at the specified path
+func (c *ephsllFrontendClient) Get(path string) (io.Reader, error) {
+ return c.GetContext(c.baseCtx, path)
+}
+
+// GetContext retrieves the content of a file at the specified path with context
+func (c *ephsllFrontendClient) GetContext(ctx context.Context, path string) (io.ReadCloser, error) {
+ ephsPath, err := ephs.ParsePath(path)
+ if err != nil {
+ return nil, err
+ }
+
+ entry, reader, err := c.ll.Get(ctx, ephsPath)
+ if err != nil {
+ return nil, err
+ }
+
+ // Return a wrapper that properly handles closing
+ return &ephsReadCloser{
+ reader: reader,
+ entry: entry,
+ }, nil
+}
+
+// Stat retrieves metadata about a file or directory at the specified path
+func (c *ephsllFrontendClient) Stat(path string) (*ephs_proto.FsEntry, error) {
+ return c.StatContext(c.baseCtx, path)
+}
+
+// StatContext retrieves metadata about a file or directory at the specified path with context
+func (c *ephsllFrontendClient) StatContext(ctx context.Context, path string) (*ephs_proto.FsEntry, error) {
+ ephsPath, err := ephs.ParsePath(path)
+ if err != nil {
+ return nil, err
+ }
+
+ return c.ll.Stat(ctx, ephsPath)
+}
+
+// Put writes content to a file at the specified path
+func (c *ephsllFrontendClient) Put(path string, size uint64, content io.Reader) (*ephs_proto.FsEntry, error) {
+ return c.PutContext(c.baseCtx, path, size, content)
+}
+
+// PutContext writes content to a file at the specified path with context
+func (c *ephsllFrontendClient) PutContext(ctx context.Context, path string, size uint64, content io.Reader) (*ephs_proto.FsEntry, error) {
+ ephsPath, err := ephs.ParsePath(path)
+ if err != nil {
+ return nil, err
+ }
+
+ return c.ll.Put(ctx, ephsPath, c.localId.Name(), 0, size, content)
+}
+
+// Delete removes a file or directory at the specified path
+func (c *ephsllFrontendClient) Delete(path string, recursive bool) error {
+ return c.DeleteContext(c.baseCtx, path, recursive)
+}
+
+// DeleteContext removes a file or directory at the specified path with context
+func (c *ephsllFrontendClient) DeleteContext(ctx context.Context, path string, recursive bool) error {
+ ephsPath, err := ephs.ParsePath(path)
+ if err != nil {
+ return err
+ }
+
+ return c.ll.Delete(ctx, ephsPath, recursive)
+}
+
+// MkDir creates a directory at the specified path
+func (c *ephsllFrontendClient) MkDir(path string, recursive bool) error {
+ return c.MkDirContext(c.baseCtx, path, recursive)
+}
+
+// MkDirContext creates a directory at the specified path with context
+func (c *ephsllFrontendClient) MkDirContext(ctx context.Context, path string, recursive bool) error {
+ ephsPath, err := ephs.ParsePath(path)
+ if err != nil {
+ return err
+ }
+
+ return c.ll.Mkdir(ctx, ephsPath, c.localId.Name(), recursive)
+}
+
+// Watch watches for changes to a file or directory at the specified path
+func (c *ephsllFrontendClient) Watch(ctx context.Context, path string) (<-chan *ephs_proto.WatchResponse, error) {
+ ephsPath, err := ephs.ParsePath(path)
+ if err != nil {
+ return nil, err
+ }
+
+ return c.ll.Watch(ctx, ephsPath)
+}
+
+// ephsReadCloser wraps an io.Reader to provide proper Close behavior
+type ephsReadCloser struct {
+ reader io.Reader
+ entry *ephs_proto.FsEntry
+ closed bool
+}
+
+func (r *ephsReadCloser) Read(p []byte) (int, error) {
+ if r.closed {
+ return 0, io.EOF
+ }
+ return r.reader.Read(p)
+}
+
+func (r *ephsReadCloser) Close() error {
+ if rc, ok := r.reader.(io.ReadCloser); ok {
+ r.closed = true
+ return rc.Close()
+ }
+ return nil
+}
--- /dev/null
+package ephsll
+
+import (
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+func IsAlreadyExists(err error) bool {
+ return errIsCode(err, codes.AlreadyExists)
+}
+
+func errIsCode(err error, code codes.Code) bool {
+ if err == nil {
+ return false
+ }
+
+ if st, ok := status.FromError(err); ok {
+ if st.Code() == code {
+ return true
+ }
+ }
+ return false
+}
package ephsll
import (
- "context"
"fmt"
"io"
"slices"
"go.fuhry.dev/runtime/mtls"
ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
"go.fuhry.dev/runtime/sd"
+ "go.fuhry.dev/runtime/utils/context"
"go.fuhry.dev/runtime/utils/log"
"go.fuhry.dev/runtime/utils/option"
)
type Option = option.Option[*ephsLowLevelClientImpl]
type EphsLowLevelClient interface {
+ HighLevelClient() ephs.Client
Stat(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error)
Get(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, io.Reader, error)
Mkdir(ctx context.Context, path ephs.Path, owner string, recursive bool) error
return ll, nil
}
+func (c *ephsLowLevelClientImpl) HighLevelClient() ephs.Client {
+ ctx, _ := context.Interruptible()
+
+ return newFrontendClient(ctx, c, c.id)
+}
+
// Stat
func (c *ephsLowLevelClientImpl) Stat(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error) {
ent, err := c.getProtoForPath(ctx, path)
--- /dev/null
+package ephsll
+
+import (
+ "bytes"
+ "io"
+
+ "go.fuhry.dev/runtime/ephs"
+ . "gopkg.in/check.v1"
+)
+
+func (s *EphsLowLevelTestSuite) TestMkdir(c *C) {
+ path, err := ephs.ParsePath("/ephs/local/test_dir")
+ c.Assert(err, IsNil)
+
+ err = s.llc.Mkdir(s.ctx, path, s.id.Name(), true)
+ c.Assert(err, IsNil)
+}
+
+func (s *EphsLowLevelTestSuite) TestPut(c *C) {
+ path, err := ephs.ParsePath("/ephs/local/test_put")
+ c.Assert(err, IsNil)
+
+ err = s.llc.Mkdir(s.ctx, path.Parent(), s.id.Name(), true)
+ if err != nil {
+ c.Assert(IsAlreadyExists(err), Equals, true)
+ }
+
+ contents := bytes.NewBufferString("test ephs file contents")
+ fse, err := s.llc.Put(s.ctx, path, s.id.Name(), 0, uint64(contents.Len()), contents)
+ c.Assert(err, IsNil)
+ c.Assert(fse.GetContent().GetFile(), NotNil)
+}
+
+func (s *EphsLowLevelTestSuite) TestGet(c *C) {
+ path, err := ephs.ParsePath("/ephs/local/test_get")
+ c.Assert(err, IsNil)
+
+ err = s.llc.Mkdir(s.ctx, path.Parent(), s.id.Name(), true)
+ if err != nil {
+ c.Assert(IsAlreadyExists(err), Equals, true)
+ }
+
+ contents := []byte("test ephs file contents")
+ size := uint64(len(contents))
+ _, err = s.llc.Put(s.ctx, path, s.id.Name(), 1, size, bytes.NewBuffer(contents))
+ c.Assert(err, IsNil)
+
+ fse, reader, err := s.llc.Get(s.ctx, path)
+ c.Assert(err, IsNil)
+ c.Assert(fse.GetVersion(), Equals, uint64(1))
+ c.Assert(fse.GetContent(), NotNil)
+ c.Assert(fse.GetContent().GetFile(), NotNil)
+ c.Assert(fse.GetSize(), Equals, size)
+
+ readContents, err := io.ReadAll(reader)
+ c.Assert(err, IsNil)
+
+ c.Assert(readContents, DeepEquals, contents)
+}
return ephsDefaultCell
}
+func OverrideDefaultCell(cell string) {
+ ephsDefaultCell = cell
+}
+
func IsEphsPath(p string) bool {
if !strings.HasPrefix(p, KeyPrefix) {
return false
"//mtls",
"//mtls/fsnotify",
"//proto/service/ephs",
+ "//utils/context",
"//utils/log",
"//utils/option",
"//utils/stringmatch",
"errors"
"fmt"
"os"
+ "slices"
"gopkg.in/yaml.v3"
)
type AclRule struct {
- Principal *stringmatch.NewSyntaxMatchRule `yaml:"principal"`
- Key *stringmatch.NewSyntaxMatchRule `yaml:"key"`
- Invert bool `yaml:"invert"`
+ Principal *stringmatch.NewSyntaxMatchRule `yaml:"principal"`
+ Key *stringmatch.NewSyntaxMatchRule `yaml:"key"`
+ Permissions *PermissionString `yaml:"perm"`
+ Invert bool `yaml:"invert"`
}
type Acl struct {
Rules []*AclRule `yaml:"rules"`
}
+type PermissionString string
+type PermissionBit byte
+
+const (
+ PermissionRead PermissionBit = 'r'
+ PermissionWrite PermissionBit = 'w'
+)
+
+var permissionBytes = []byte{
+ byte(PermissionRead),
+ byte(PermissionWrite),
+}
+
+func (p *PermissionString) UnmarshalYAML(node *yaml.Node) error {
+ for _, b := range []byte(node.Value) {
+ if !slices.Contains(permissionBytes, b) {
+ return fmt.Errorf("unknown permission bit: %s", string(b))
+ }
+ }
+ *p = PermissionString(node.Value)
+ return nil
+}
+
+func (p *PermissionString) Has(b PermissionBit) bool {
+ return slices.Contains([]PermissionBit(*p), b)
+}
+
func LoadAcl(filePath string) (*Acl, error) {
fp, err := os.OpenFile(filePath, os.O_RDONLY, os.FileMode(0))
if err != nil {
return acl, nil
}
-func (r *AclRule) Match(principal, key string) bool {
+func (r *AclRule) Match(principal, key string, op PermissionBit) bool {
if !r.Principal.Match(principal) {
return false
}
keyMatcher = keyMatcher.Sub(vars)
- return keyMatcher.Match(key)
+ if !keyMatcher.Match(key) {
+ return false
+ }
+
+ if r.Permissions != nil && !r.Permissions.Has(op) {
+ return false
+ }
+
+ return true
}
-func (a *Acl) Check(principal, key string) bool {
+func (a *Acl) Check(principal, key string, op PermissionBit) bool {
for _, r := range a.Rules {
- if r.Match(principal, key) {
+ if r.Match(principal, key, op) {
return !r.Invert
}
}
exact: spiffe://domain.example.com/user/bob
key:
exact: "{{name}}/can/write/this"
+ perm: rw
- principal:
any: true
key:
- exact: spiffe://domain.example.com/user/suzie
key:
prefix: or/test/{{name}}/
+ perm: r
- principal:
prefix: spiffe://domain.example.com/service/
key:
exact: acl/{{name}}.yaml
+ perm: r
`
func TestAcl(t *testing.T) {
type testCase struct {
description, princ, key string
+ op PermissionBit
expect bool
}
testCases := []testCase{
{
- "bob has access to bob/can/write/this",
+ "bob has read access to bob/can/write/this",
"spiffe://domain.example.com/user/bob",
"bob/can/write/this",
+ PermissionRead,
+ true,
+ },
+ {
+ "bob has write access to bob/can/write/this",
+ "spiffe://domain.example.com/user/bob",
+ "bob/can/write/this",
+ PermissionWrite,
true,
},
{
"bob does not have access to suzie/can/write/this",
"spiffe://domain.example.com/user/bob",
"suzie/can/write/this",
+ PermissionRead,
false,
},
{
"suzie does not have access to suzie/can/write/this",
"spiffe://domain.example.com/user/suzie",
"suzie/can/write/this",
+ PermissionRead,
false,
},
{
"bob has access to writable/by/self/bob",
"spiffe://domain.example.com/user/bob",
"writable/by/self/bob",
+ PermissionRead,
true,
},
{
"suzie has access to writable/by/self/suzie",
"spiffe://domain.example.com/user/suzie",
"writable/by/self/suzie",
+ PermissionRead,
true,
},
{
"bob has access to writable/by/self/suzie",
"spiffe://domain.example.com/user/suzie",
"writable/by/self/suzie",
+ PermissionRead,
true,
},
{
"bob has access to or/test/bob/foo",
"spiffe://domain.example.com/user/bob",
"or/test/bob/foo",
+ PermissionRead,
true,
},
{
"suzie has access to or/test/suzie/foo",
"spiffe://domain.example.com/user/suzie",
"or/test/suzie/foo",
+ PermissionRead,
true,
},
{
"frank does not have access to or/test/frank/foo",
"spiffe://domain.example.com/user/frank",
"or/test/frank/foo",
+ PermissionRead,
false,
},
{
"bob does not have access no/rule/for/this",
"spiffe://domain.example.com/user/bob",
"no/rule/for/this",
+ PermissionRead,
false,
},
{
- "bob has access to acl/bob.yaml",
+ "bob has read access to acl/bob.yaml",
"spiffe://domain.example.com/service/bob",
"acl/bob.yaml",
+ PermissionRead,
true,
},
+ {
+ "bob does not have write access to acl/bob.yaml",
+ "spiffe://domain.example.com/service/bob",
+ "acl/bob.yaml",
+ PermissionWrite,
+ false,
+ },
{
"bob has no access to acl/frank.yaml",
"spiffe://domain.example.com/service/bob",
"acl/frank.yaml",
+ PermissionRead,
false,
},
}
for i, tc := range testCases {
- assert.Equal(tc.expect, acl.Check(tc.princ, tc.key),
+ assert.Equal(tc.expect, acl.Check(tc.princ, tc.key, tc.op),
"test case %d: %s", i, tc.description)
}
}
import (
"bytes"
- "context"
"errors"
"io"
"go.fuhry.dev/runtime/mtls"
"go.fuhry.dev/runtime/mtls/fsnotify"
ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
+ "go.fuhry.dev/runtime/utils/context"
"go.fuhry.dev/runtime/utils/log"
"go.fuhry.dev/runtime/utils/option"
)
GetPath() string
}
+type EphsServer interface {
+ ephs_proto.EphsServer
+
+ // InstallClient creates a client that implements the ephs.Client interface,
+ // but using the low-level client directly instead of the gRPC client. The
+ // default ephs client is overridden to use this implementation.
+ //
+ // This is used to solve the circular dependency of "to use ephs, ephs must
+ // be available" - allowing functionality like retrieving ACLs from ephs even
+ // if there are no currently healthy ephs servers.
+ InstallClient()
+}
+
type ephsServicer struct {
ephs_proto.EphsServer
})
}
-func NewEphsServicer(opts ...Option) (ephs_proto.EphsServer, error) {
+func NewEphsServicer(opts ...Option) (EphsServer, error) {
serv := &ephsServicer{
logger: log.Default().WithPrefix("ephsServicer"),
}
return serv, nil
}
-func (s *ephsServicer) checkPath(ctx context.Context, p string) (ephs.Path, error) {
+func (s *ephsServicer) InstallClient() {
+ fec := s.ll.HighLevelClient()
+ ephs.OverrideDefaultClient(fec)
+}
+
+func (s *ephsServicer) checkPath(ctx context.Context, p string, opType PermissionBit) (ephs.Path, error) {
ephsPath, err := ephs.ParsePath(p)
if err != nil {
return nil, status.Errorf(
}
if s.acl != nil {
- if !s.acl.Check(ident, ephsPath.AclPath()) {
+ if !s.acl.Check(ident, ephsPath.AclPath(), opType) {
return nil, status.Errorf(
codes.PermissionDenied,
"access to path %q denied for %q (rule path: %q)",
return ephsPath, nil
}
+func (s *ephsServicer) logRequest(ctx context.Context, method string, req msgWithPath) {
+ id, err := identityFromContext(ctx)
+ if err != nil {
+ return
+ }
+
+ s.logger.AppendPrefix("]["+id).Infof("%s(%q)", method, req.GetPath())
+}
+
func (s *ephsServicer) Stat(ctx context.Context, req *ephs_proto.GetRequest) (*ephs_proto.StatResponse, error) {
s.logRequest(ctx, "Stat", req)
- ephsPath, err := s.checkPath(ctx, req.Path)
+ ephsPath, err := s.checkPath(ctx, req.Path, PermissionRead)
if err != nil {
return nil, err
}
}, nil
}
-func (s *ephsServicer) logRequest(ctx context.Context, method string, req msgWithPath) {
- id, err := identityFromContext(ctx)
- if err != nil {
- return
- }
-
- s.logger.AppendPrefix("]["+id).Infof("%s(%q)", method, req.GetPath())
-}
-
func (s *ephsServicer) Get(req *ephs_proto.GetRequest, server ephs_proto.Ephs_GetServer) error {
ctx := server.Context()
s.logRequest(ctx, "Get", req)
- ephsPath, err := s.checkPath(ctx, req.Path)
+ ephsPath, err := s.checkPath(ctx, req.Path, PermissionRead)
if err != nil {
return err
}
entry, reader, err := s.ll.Get(ctx, ephsPath)
if err != nil {
if errors.Is(err, ephsll.ErrDirectory) {
- return status.Errorf(
+ err = status.Errorf(
codes.FailedPrecondition,
"%q: is a directory",
req.Path)
}
func (s *ephsServicer) MkDir(ctx context.Context, req *ephs_proto.MkDirRequest) (*emptypb.Empty, error) {
- path, err := s.checkPath(ctx, req.Path)
+ path, err := s.checkPath(ctx, req.Path, PermissionWrite)
if err != nil {
return nil, err
}
whoami, _ := identityFromContext(ctx)
- return &emptypb.Empty{}, s.ll.Mkdir(ctx, path, whoami, req.Recursive)
+ err = s.ll.Mkdir(ctx, path, whoami, req.Recursive)
+ return &emptypb.Empty{}, err
}
func (s *ephsServicer) Delete(ctx context.Context, req *ephs_proto.DeleteRequest) (*emptypb.Empty, error) {
s.logRequest(ctx, "Delete", req)
- ephsPath, err := s.checkPath(ctx, req.Path)
+ ephsPath, err := s.checkPath(ctx, req.Path, PermissionWrite)
if err != nil {
return nil, err
}
- if err := s.ll.Delete(ctx, ephsPath, req.Recursive); err != nil {
+ err = s.ll.Delete(ctx, ephsPath, req.Recursive)
+ if err != nil {
return nil, err
}
return err
}
- ephsPath, err := s.checkPath(ctx, msg.Path)
+ ephsPath, err := s.checkPath(ctx, msg.Path, PermissionWrite)
if err != nil {
return err
}
response := &ephs_proto.StatResponse{
Entry: ent,
}
- return server.SendAndClose(response)
+ err = server.SendAndClose(response)
+ return err
}
func (s *ephsServicer) Watch(req *ephs_proto.GetRequest, server ephs_proto.Ephs_WatchServer) error {
ctx := server.Context()
s.logRequest(ctx, "Watch", req)
- ephsPath, err := s.checkPath(ctx, req.Path)
+ ephsPath, err := s.checkPath(ctx, req.Path, PermissionRead)
if err != nil {
return err
}