From: Dan Fuhry Date: Sat, 22 Nov 2025 03:01:18 +0000 (-0500) Subject: [ephs] split server into servicer and low-level client X-Git-Url: https://go.fuhry.dev/?a=commitdiff_plain;ds=inline;p=runtime.git [ephs] split server into servicer and low-level client Make the ephs servicer act as a front-end to the ephs low-level client library, which handles all direct interaction with etcd. - Move all ephs etcd interaction from `ephs/servicer` to `ephs/ephsll` - Add ephs.Path type to make ephs path transformations simpler - Refactor grpc's `WithStaticAddress` option to support DNS names --- diff --git a/cmd/ephs_client/main.go b/cmd/ephs_client/main.go index da5aeaa..93b4f6d 100644 --- a/cmd/ephs_client/main.go +++ b/cmd/ephs_client/main.go @@ -379,7 +379,10 @@ func main() { }, } - if err := cmd.Run(ctx, os.Args); err != nil { + args := os.Args[:1] + args = append(args, flag.Args()...) + + if err := cmd.Run(ctx, args); err != nil { log.Fatal(err) os.Exit(1) } diff --git a/cmd/ephs_server/main.go b/cmd/ephs_server/main.go index 288696e..79685cc 100644 --- a/cmd/ephs_server/main.go +++ b/cmd/ephs_server/main.go @@ -2,6 +2,7 @@ package main import ( "flag" + "os" "go.fuhry.dev/runtime/ephs/servicer" "go.fuhry.dev/runtime/grpc" @@ -16,6 +17,8 @@ import ( func main() { var err error + mtls.SetDefaultIdentity("ephs") + acl := flag.String("ephs.acl", "", "YAML file containing ACLs for ephs") awsFile := flag.String("ephs.s3-creds-file", "", "file to load AWS S3 credentials from") awsEnv := flag.Bool("ephs.s3-creds-env", false, "set to true to load AWS credentials from environment") @@ -25,7 +28,8 @@ func main() { serverIdentity := mtls.DefaultIdentity() s, err := grpc.NewGrpcServer(serverIdentity, grpc.WithTransport(&grpc.QUICConnectionFactory{})) if err != nil { - panic(err) + log.Fatal(err) + os.Exit(1) } var opts []servicer.Option @@ -42,6 +46,7 @@ func main() { serv, err := servicer.NewEphsServicer(opts...) if err != nil { log.Fatal(err) + os.Exit(1) } ctx, cancel := context.Interruptible() @@ -51,7 +56,8 @@ func main() { ephs_pb.RegisterEphsServer(s, serv) }) if err != nil { - panic(err) + log.Fatal(err) + os.Exit(1) } defer s.Stop() diff --git a/cmd/grpc_health_probe/main.go b/cmd/grpc_health_probe/main.go index 244d9eb..be9dec0 100644 --- a/cmd/grpc_health_probe/main.go +++ b/cmd/grpc_health_probe/main.go @@ -2,9 +2,7 @@ package main import ( "flag" - "net" "os" - "strconv" "time" "go.fuhry.dev/runtime/grpc" @@ -33,24 +31,7 @@ func main() { var opts []grpc.ClientOption if *serverAddr != "" { - host, port, err := net.SplitHostPort(*serverAddr) - if err != nil { - log.Default().Panic(err) - } - - portInt, err := strconv.Atoi(port) - if err != nil { - portInt, err = net.LookupPort("tcp", port) - if err != nil { - log.Default().Panic(err) - } - } - - if ip := net.ParseIP(host); ip == nil { - log.Default().Panicf("%q: not a valid IPv4 or IPv6 address", host) - } else { - opts = append(opts, grpc.WithStaticAddress(&net.TCPAddr{ip, portInt, ""})) - } + opts = append(opts, grpc.WithStaticAddress(*serverAddr)) } ctx, cancel := context.Interruptible() diff --git a/ephs/BUILD.bazel b/ephs/BUILD.bazel index 8c1413f..e4f36dd 100644 --- a/ephs/BUILD.bazel +++ b/ephs/BUILD.bazel @@ -2,7 +2,10 @@ load("@rules_go//go:def.bzl", "go_library") go_library( name = "ephs", - srcs = ["client.go"], + srcs = [ + "client.go", + "path.go", + ], importpath = "go.fuhry.dev/runtime/ephs", visibility = ["//visibility:public"], deps = [ @@ -11,6 +14,7 @@ go_library( "//mtls", "//proto/service/ephs", "//utils/context", + "//utils/hostname", "//utils/log", "//utils/option", "@com_github_quic_go_quic_go//:quic-go", diff --git a/ephs/client.go b/ephs/client.go index d4eac3b..5a1943d 100644 --- a/ephs/client.go +++ b/ephs/client.go @@ -1,3 +1,13 @@ +// Package ephs is the public-facing, high-level client library for interacting with ephs. +// +// ephs (pronounced as "effs") is "ephemeral filesystem." It is a filesystem implemented on top of +// etcd and S3-compatible object storage, with object storage being used for files above a certain +// size threshold. Presently this is 256 KiB or one chunk. +// +// Secure deployment of ephs requires either a dedicated etcd instance or careful configuration of +// etcd access such that reads and writes of keys beginning with `/ephs/` are restricted to the ephs +// gRPC service. + package ephs import ( @@ -23,6 +33,7 @@ import ( ) const KeyPrefix = "/ephs/" +const PathSeparator = "/" const ChunkSize = 262144 const formatEntryDateFormat = "Monday, 2 Jan 2006 15:04:05 -0700" @@ -55,8 +66,9 @@ type clientImpl struct { defaultTimeout time.Duration id mtls.Identity - client grpc.Client - conn *grpc.ClientConn + grpcOpts []grpc.ClientOption + client grpc.Client + conn *grpc.ClientConn } type getReader struct { @@ -74,13 +86,11 @@ var ephsQuicConfig = &quic.Config{ KeepAlivePeriod: 5 * time.Second, } +var defaultClientAddr string + var defaultClient Client var defaultClientMu sync.Mutex -func IsEphsPath(p string) bool { - return strings.HasPrefix(p, KeyPrefix) -} - type notFoundError struct { ephsPath string } @@ -162,6 +172,13 @@ func WithDefaultTimeout(d time.Duration) ClientOption { }) } +func withGrpcClientOptions(opts ...grpc.ClientOption) ClientOption { + return option.NewOption(func(c *clientImpl) error { + c.grpcOpts = append(c.grpcOpts, opts...) + return nil + }) +} + func DefaultClient() (Client, error) { defaultClientMu.Lock() defer defaultClientMu.Unlock() @@ -174,7 +191,13 @@ func DefaultClient() (Client, error) { DefaultClientContext, _ = context.Interruptible() } - client, err := NewClient(DefaultClientContext, mtls.DefaultIdentity()) + var opts []ClientOption + if defaultClientAddr != "" { + addrs := strings.Split(defaultClientAddr, ",") + opts = append(opts, withGrpcClientOptions(grpc.WithStaticAddress(addrs...))) + } + + client, err := NewClient(DefaultClientContext, mtls.DefaultIdentity(), opts...) if err != nil { return nil, err } @@ -188,6 +211,11 @@ func NewClient(ctx context.Context, localId mtls.Identity, opts ...ClientOption) defaultCtx: ctx, defaultTimeout: 15 * time.Second, id: localId, + grpcOpts: []grpc.ClientOption{ + grpc.WithConnectionFactory(&grpc_common.QUICConnectionFactory{ + QUICConfig: ephsQuicConfig.Clone(), + }), + }, } for _, opt := range opts { @@ -447,10 +475,7 @@ func (c *clientImpl) grpcClient() (ephs_pb.EphsClient, error) { var err error if c.client == nil { - c.client, err = grpc.NewGrpcClient(c.defaultCtx, "ephs", c.id, - grpc.WithConnectionFactory(&grpc_common.QUICConnectionFactory{ - QUICConfig: ephsQuicConfig.Clone(), - })) + c.client, err = grpc.NewGrpcClient(c.defaultCtx, "ephs", c.id, c.grpcOpts...) if err != nil { return nil, fmt.Errorf("error creating grpc client: %T: %v", err, err) } diff --git a/ephs/ephsll/BUILD.bazel b/ephs/ephsll/BUILD.bazel new file mode 100644 index 0000000..8601f28 --- /dev/null +++ b/ephs/ephsll/BUILD.bazel @@ -0,0 +1,29 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "ephsll", + srcs = [ + "fs_object.go", + "low_level_client.go", + "s3.go", + ], + importpath = "go.fuhry.dev/runtime/ephs/ephsll", + visibility = ["//visibility:public"], + deps = [ + "//constants", + "//ephs", + "//mtls", + "//proto/service/ephs", + "//sd", + "//utils/log", + "//utils/option", + "@com_github_minio_minio_go_v7//:minio-go", + "@com_github_minio_minio_go_v7//pkg/credentials", + "@io_etcd_go_etcd_client_v3//:client", + "@io_etcd_go_etcd_client_v3//concurrency", + "@org_golang_google_grpc//codes", + "@org_golang_google_grpc//status", + "@org_golang_google_protobuf//proto", + "@org_golang_google_protobuf//types/known/timestamppb", + ], +) diff --git a/ephs/servicer/fs_object.go b/ephs/ephsll/fs_object.go similarity index 73% rename from ephs/servicer/fs_object.go rename to ephs/ephsll/fs_object.go index 94df6c2..92ef2d9 100644 --- a/ephs/servicer/fs_object.go +++ b/ephs/ephsll/fs_object.go @@ -1,4 +1,4 @@ -package servicer +package ephsll import ( "bytes" @@ -19,8 +19,9 @@ import ( const LargeFileThreshold = ephs.ChunkSize -type FsEntry struct { +type fsEntryWrapper struct { *ephs_proto.FsEntry + s3Client *minio.Client } var ErrDirectory = errors.New("is a directory") @@ -28,20 +29,24 @@ var ErrUnknownCompression = errors.New("file compressed with unknown scheme") var RegexpLargeFileKey = regexp.MustCompile(`^[0-9A-Za-z_-]{64}$`) -func (e *FsEntry) GetContents(ctx context.Context, s3 *minio.Client) (io.Reader, error) { +func (e *fsEntryWrapper) Proto() *ephs_proto.FsEntry { + return e.FsEntry +} + +func (e *fsEntryWrapper) GetContents(ctx context.Context) (io.Reader, error) { switch { case e.FsEntry.Content.GetDirectory() != nil: return nil, ErrDirectory case e.FsEntry.Content.GetFile() != nil: return e.readFile() case e.FsEntry.Content.GetLargeFile() != nil: - return e.readLargeFile(ctx, s3) + return e.readLargeFile(ctx) default: return nil, errors.New("file content not populated with any known type") } } -func (e *FsEntry) SetContents(ctx context.Context, s3 *minio.Client, r io.Reader, size uint64) error { +func (e *fsEntryWrapper) SetContents(ctx context.Context, r io.Reader, size uint64) error { if e.FsEntry.Content.GetDirectory() != nil { return ErrDirectory } @@ -54,7 +59,7 @@ func (e *FsEntry) SetContents(ctx context.Context, s3 *minio.Client, r io.Reader var err error if size >= LargeFileThreshold { - err = e.writeLargeFile(ctx, s3, r, size) + err = e.writeLargeFile(ctx, r, size) } else { err = e.writeFile(r, size) } @@ -65,7 +70,7 @@ func (e *FsEntry) SetContents(ctx context.Context, s3 *minio.Client, r io.Reader if mustCleanupLargeFile { log.Default().Infof("mustCleanupLargeFile: %+v", oldLargeFile) - err = e.cleanupOldLargeFile(ctx, s3, &oldLargeFile) + err = e.cleanupOldLargeFile(ctx, &oldLargeFile) if err != nil { log.Default().Alert( "error while cleaning up old large file with key %q: %v", @@ -77,7 +82,7 @@ func (e *FsEntry) SetContents(ctx context.Context, s3 *minio.Client, r io.Reader return nil } -func (e *FsEntry) DeleteContents(ctx context.Context, s3 *minio.Client) error { +func (e *fsEntryWrapper) DeleteContents(ctx context.Context) error { switch { case e.FsEntry.Content.GetDirectory() != nil: return ErrDirectory @@ -85,13 +90,13 @@ func (e *FsEntry) DeleteContents(ctx context.Context, s3 *minio.Client) error { e.FsEntry.Content = nil return nil case e.FsEntry.Content.GetLargeFile() != nil: - return e.cleanupOldLargeFile(ctx, s3, e.Content.GetLargeFile()) + return e.cleanupOldLargeFile(ctx, e.Content.GetLargeFile()) default: return errors.New("file content not populated with any known type") } } -func (e *FsEntry) readFile() (io.Reader, error) { +func (e *fsEntryWrapper) readFile() (io.Reader, error) { file := e.FsEntry.Content.GetFile() if file == nil { return nil, errors.New("internal error: readFile() called but file is unpopulated") @@ -107,7 +112,11 @@ func (e *FsEntry) readFile() (io.Reader, error) { } } -func (e *FsEntry) readLargeFile(ctx context.Context, s3 *minio.Client) (io.Reader, error) { +func (e *fsEntryWrapper) readLargeFile(ctx context.Context) (io.Reader, error) { + if e.s3Client == nil { + return nil, errors.New("s3 client not initialized, unable to read large file") + } + if e.Content == nil || e.Content.GetLargeFile() == nil { return nil, errors.New("not a large file") } @@ -117,10 +126,10 @@ func (e *FsEntry) readLargeFile(ctx context.Context, s3 *minio.Client) (io.Reade return nil, errors.New("large file key contains invalid characters or is the wrong length") } - return s3.GetObject(ctx, s3Bucket, s3Prefix+key, minio.GetObjectOptions{}) + return e.s3Client.GetObject(ctx, s3Bucket, s3Prefix+key, minio.GetObjectOptions{}) } -func (e *FsEntry) writeFile(r io.Reader, size uint64) error { +func (e *fsEntryWrapper) writeFile(r io.Reader, size uint64) error { var compression = ephs_proto.FsEntry_File_GZIP if f := e.FsEntry.Content.GetFile(); f != nil { @@ -162,11 +171,15 @@ func (e *FsEntry) writeFile(r io.Reader, size uint64) error { return nil } -func (e *FsEntry) writeLargeFile(ctx context.Context, s3 *minio.Client, r io.Reader, size uint64) error { +func (e *fsEntryWrapper) writeLargeFile(ctx context.Context, r io.Reader, size uint64) error { + if e.s3Client == nil { + return errors.New("s3 client not initialized") + } + key := newLargeFileKey() log.Default().Noticef("attempting to PutObject into bucket %s at key %s", s3Bucket, key) - upload, err := s3.PutObject(ctx, s3Bucket, s3Prefix+key, r, int64(size), minio.PutObjectOptions{}) + upload, err := e.s3Client.PutObject(ctx, s3Bucket, s3Prefix+key, r, int64(size), minio.PutObjectOptions{}) if err != nil { log.Default().Error(err) return err @@ -185,13 +198,16 @@ func (e *FsEntry) writeLargeFile(ctx context.Context, s3 *minio.Client, r io.Rea return nil } -func (e *FsEntry) cleanupOldLargeFile(ctx context.Context, s3 *minio.Client, lf *ephs_proto.FsEntry_LargeFile) error { +func (e *fsEntryWrapper) cleanupOldLargeFile(ctx context.Context, lf *ephs_proto.FsEntry_LargeFile) error { + if e.s3Client == nil { + return errors.New("s3 client not initialized") + } key := lf.GetKey() if !RegexpLargeFileKey.MatchString(key) { return errors.New("large file key contains invalid characters or is the wrong length") } - return s3.RemoveObject(ctx, s3Bucket, s3Prefix+key, minio.RemoveObjectOptions{}) + return e.s3Client.RemoveObject(ctx, s3Bucket, s3Prefix+key, minio.RemoveObjectOptions{}) } func newLargeFileKey() string { diff --git a/ephs/ephsll/low_level_client.go b/ephs/ephsll/low_level_client.go new file mode 100644 index 0000000..8fd085f --- /dev/null +++ b/ephs/ephsll/low_level_client.go @@ -0,0 +1,504 @@ +// Package ephsll is the low-level client for ephs. +// +// The low-level client handles all direct interaction with etcd. + +package ephsll + +import ( + "context" + "fmt" + "io" + "slices" + "sync" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + etcd_client "go.etcd.io/etcd/client/v3" + etcd_concurrency "go.etcd.io/etcd/client/v3/concurrency" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + "go.fuhry.dev/runtime/ephs" + "go.fuhry.dev/runtime/mtls" + ephs_proto "go.fuhry.dev/runtime/proto/service/ephs" + "go.fuhry.dev/runtime/sd" + "go.fuhry.dev/runtime/utils/log" + "go.fuhry.dev/runtime/utils/option" +) + +type Option = option.Option[*ephsLowLevelClientImpl] + +type EphsLowLevelClient interface { + Stat(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error) + Get(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, io.Reader, error) + Mkdir(ctx context.Context, path ephs.Path, owner string, recursive bool) error + Delete(ctx context.Context, path ephs.Path, recursive bool) error + Put(ctx context.Context, path ephs.Path, owner string, version, size uint64, contents io.Reader) (*ephs_proto.FsEntry, error) + Watch(ctx context.Context, path ephs.Path) (<-chan *ephs_proto.WatchResponse, error) +} + +type ephsLowLevelClientImpl struct { + id mtls.Identity + logger log.Logger + clientMu sync.Mutex + clients map[string]*etcd_client.Client + s3Creds *credentials.Credentials + s3Client *minio.Client +} + +var _ EphsLowLevelClient = &ephsLowLevelClientImpl{} + +func NewEphsLowLevelClient(id mtls.Identity, opts ...Option) (EphsLowLevelClient, error) { + ll := &ephsLowLevelClientImpl{ + id: id, + logger: log.WithPrefix("EphsLowLevelClient"), + clients: make(map[string]*etcd_client.Client), + } + + for _, opt := range opts { + if err := opt.Apply(ll); err != nil { + return nil, err + } + } + + if _, err := ll.clientForCell(ephs.DefaultCell()); err != nil { + return nil, err + } + + s3, err := ll.newS3Client() + if err != nil { + return nil, err + } + ll.s3Client = s3 + + return ll, nil +} + +// Stat +func (c *ephsLowLevelClientImpl) Stat(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error) { + ent, err := c.getProtoForPath(ctx, path) + if err != nil { + return nil, err + } + + // Never send file contents in a stat response + if _, ok := ent.Content.GetContent().(*ephs_proto.FsEntry_Content_File); ok { + ent.Content = &ephs_proto.FsEntry_Content{ + Content: &ephs_proto.FsEntry_Content_File{ + File: &ephs_proto.FsEntry_File{}, + }, + } + } + + return ent, nil +} + +// Get +func (c *ephsLowLevelClientImpl) Get(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, io.Reader, error) { + ent, err := c.getProtoForPath(ctx, path) + if err != nil { + return nil, nil, err + } + + if ent.Content.GetDirectory() != nil { + return nil, nil, ErrDirectory + } + + reader, err := (&fsEntryWrapper{FsEntry: ent, s3Client: c.s3Client}).GetContents(ctx) + if err != nil { + return nil, nil, err + } + + return ent, reader, nil +} + +// Mkdir +func (c *ephsLowLevelClientImpl) Mkdir(ctx context.Context, path ephs.Path, owner string, recursive bool) error { + return c.mkdir(ctx, path, owner, recursive) +} + +// Delete +func (c *ephsLowLevelClientImpl) Delete(ctx context.Context, ephsPath ephs.Path, recursive bool) error { + entry, err := c.getProtoForPath(ctx, ephsPath) + if err != nil { + return err + } + + fse := &fsEntryWrapper{FsEntry: entry, s3Client: c.s3Client} + if fse.Content.GetDirectory() != nil { + if len(fse.Content.GetDirectory().Entries) > 0 && !recursive { + return status.Errorf(codes.FailedPrecondition, + "refusing to delete non-empty directory %q without recursion flag set", + ephsPath.String()) + } + + for _, ent := range fse.Content.GetDirectory().Entries { + if err := c.Delete(ctx, ephsPath.Child(ent.Name), recursive); err != nil { + return err + } + } + } else { + if err := fse.DeleteContents(ctx); err != nil { + return err + } + } + + err = c.modifyDirectory(ctx, ephsPath.Parent(), entry.Owner, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error { + index := -1 + for i, ent := range dEnt.Entries { + if ent.Name == ephsPath.Basename() { + index = i + break + } + } + if index >= 0 { + dEnt.Entries = slices.Delete(dEnt.Entries, index, index+1) + } + stm.Del(ephsPath.EtcdKey()) + return nil + }) + + return err +} + +// Put +func (c *ephsLowLevelClientImpl) Put(ctx context.Context, ephsPath ephs.Path, owner string, version, size uint64, contents io.Reader) (*ephs_proto.FsEntry, error) { + _, err := c.getProtoForPath(ctx, ephsPath.Parent()) + if err != nil { + if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound { + return nil, status.Errorf( + codes.FailedPrecondition, + "cannot write %q: parent directory %q does not exist", + ephsPath.String(), ephsPath.Parent().String()) + } + return nil, err + } + + pbEnt, err := c.getProtoForPath(ctx, ephsPath) + if err != nil { + if status, ok := status.FromError(err); ok && status.Code() == codes.NotFound { + // NotFound is only returned after all access checks have succeeded. Ok to + // create file. + now := timestamppb.Now() + pbEnt = &ephs_proto.FsEntry{ + Created: now, + Modified: now, + Version: version - 1, + Size: size, + Owner: owner, + } + } else { + // some other error besides not-found + return nil, err + } + } else { + if version != 0 && version != (pbEnt.Version+1) { + return nil, status.Errorf( + codes.FailedPrecondition, + "version conflict: attempted to overwrite version %d with %d, "+ + "version number must be current version + 1", + pbEnt.Version, + version) + } + } + ent := &fsEntryWrapper{FsEntry: pbEnt, s3Client: c.s3Client} + if pbEnt.Content.GetDirectory() != nil { + return nil, status.Errorf(codes.FailedPrecondition, + "cannot write %q: path already exists and is a directory", + ephsPath.String()) + } + + if err := ent.SetContents(ctx, contents, size); err != nil { + return nil, err + } + + ent.FsEntry.Modified = timestamppb.Now() + ent.FsEntry.Size = size + ent.FsEntry.Version++ + + marshaled, err := proto.Marshal(ent.FsEntry) + if err != nil { + return nil, err + } + + err = c.modifyDirectory(ctx, ephsPath.Parent(), owner, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error { + stm.Put(ephsPath.EtcdKey(), string(marshaled)) + for _, ent := range dEnt.Entries { + if ent.Name == ephsPath.Basename() { + return nil + } + } + + dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{ + Name: ephsPath.Basename(), + Directory: false, + }) + + return nil + }) + + if err != nil { + _ = ent.DeleteContents(ctx) + return nil, err + } + + return ent.FsEntry, nil +} + +// Watch +func (c *ephsLowLevelClientImpl) Watch(ctx context.Context, ephsPath ephs.Path) (<-chan *ephs_proto.WatchResponse, error) { + ctx = etcd_client.WithRequireLeader(ctx) + resp := &ephs_proto.WatchResponse{ + Entry: nil, + Event: ephs_proto.WatchResponse_DELETE, + } + + if fse, err := c.getProtoForPath(ctx, ephsPath); err == nil { + resp.Entry = fse + resp.Event = ephs_proto.WatchResponse_CREATE + } + + etcd, err := c.clientForCell(ephsPath.Cell()) + if err != nil { + return nil, err + } + + ch := make(chan *ephs_proto.WatchResponse) + + go c.watch(ctx, ephsPath, ch, resp, etcd) + + return ch, nil +} + +func (c *ephsLowLevelClientImpl) watch(ctx context.Context, ephsPath ephs.Path, ch chan *ephs_proto.WatchResponse, resp *ephs_proto.WatchResponse, etcd *etcd_client.Client) { + defer close(ch) + + ch <- resp + + for ctx.Err() == nil { + watcher := etcd.Watch(ctx, ephsPath.EtcdKey()) + for msg := range watcher { + c.logger.Debugf("watcher: got event: %+v", msg) + for _, event := range msg.Events { + if string(event.Kv.Key) != ephsPath.EtcdKey() { + continue + } + + resp.Entry = &ephs_proto.FsEntry{} + + if event.IsCreate() { + resp.Event = ephs_proto.WatchResponse_CREATE + } else if event.IsModify() { + resp.Event = ephs_proto.WatchResponse_MODIFY + } else if event.Type == etcd_client.EventTypeDelete { + resp.Entry = nil + resp.Event = ephs_proto.WatchResponse_DELETE + } else { + continue + } + + if resp.Entry != nil { + if err := proto.Unmarshal(event.Kv.Value, resp.Entry); err != nil { + c.logger.Errorf("protobuf unmarshal error: %v", err) + return + } + } + + ch <- resp + } + } + } +} + +func (c *ephsLowLevelClientImpl) mkdir(ctx context.Context, ephsPath ephs.Path, owner string, recursive bool) error { + etcd, err := c.clientForCell(ephsPath.Cell()) + if err != nil { + return err + } + + txn, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error { + var ( + dir *ephs_proto.FsEntry = newDir(owner) + err error + ) + for _, ancestor := range ephsPath.Lineage() { + parent := ancestor.Parent() + if ancestor == ancestor.Parent() { + continue + } + dirRaw := []byte(stm.Get(parent.EtcdKey())) + if len(dirRaw) == 0 { + if !recursive { + return status.Errorf(codes.NotFound, + "cannot mkdir %q: parent directory %q does not exist and recursion is disabled", + ephsPath.String(), parent) + } + + dir = newDir(owner) + } else { + err = proto.Unmarshal(dirRaw, dir) + if err != nil { + return status.Errorf(codes.Internal, + "error unmarshaling parent directory %q: %T: %v", + parent, err, err) + } + + if dir.Content.GetDirectory() == nil { + return status.Errorf( + codes.FailedPrecondition, + "cannot mkdir %q: parent item %q is not a directory (raw len: %d)", + ephsPath.String(), parent, len(dirRaw)) + } + + dir.Modified = timestamppb.Now() + dir.Version += 1 + } + + dEnt := dir.Content.GetDirectory() + + dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{ + Name: ancestor.Basename(), + Directory: true, + }) + + dirRaw, err = proto.Marshal(dir) + if err != nil { + return status.Errorf(codes.Internal, + "error marshaling parent directory %q: %T: %v", + parent, err, err) + } + + stm.Put(parent.EtcdKey(), string(dirRaw)) + } + + entry := []byte(stm.Get(ephsPath.EtcdKey())) + if len(entry) > 0 { + return status.Errorf(codes.AlreadyExists, + "cannot create directory %q: already exists", + ephsPath.String()) + } + + dirRaw, err := proto.Marshal(newDir(owner)) + if err != nil { + return status.Errorf(codes.Internal, + "error marshaling new directory %q: %T: %v", + ephsPath.String(), err, err) + } + + stm.Put(ephsPath.EtcdKey(), string(dirRaw)) + return nil + }, etcd_concurrency.WithAbortContext(ctx)) + if err != nil { + return err + } + if !txn.Succeeded { + return fmt.Errorf("transaction failed: %+v", txn) + } + + return nil +} + +func (c *ephsLowLevelClientImpl) modifyDirectory(ctx context.Context, ephsPath ephs.Path, owner string, apply func(*ephs_proto.FsEntry_Directory, etcd_concurrency.STM) error) (err error) { + etcd, err := c.clientForCell(ephsPath.Cell()) + if err != nil { + return err + } + + txnResp, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error { + dirRaw := []byte(stm.Get(ephsPath.EtcdKey())) + dir := newDir(owner) + if len(dirRaw) > 0 { + if err := proto.Unmarshal(dirRaw, dir); err != nil { + return fmt.Errorf("error unmarshaling FsEntry at %q: %v", ephsPath.String(), err) + } + } + + if dir.Content.GetDirectory() == nil { + return fmt.Errorf("ephs path %q: not a directory", ephsPath.String()) + } + + if err = apply(dir.Content.GetDirectory(), stm); err != nil { + return fmt.Errorf("error performing atomic directory modification: %v", err) + } + + dir.Modified = timestamppb.Now() + dir.Version += 1 + + if dirRaw, err = proto.Marshal(dir); err != nil { + return fmt.Errorf("error re-marshaling FsEntry at %q after calling apply: %v", ephsPath.String(), err) + } + + stm.Put(ephsPath.EtcdKey(), string(dirRaw)) + + return nil + }, etcd_concurrency.WithAbortContext(ctx)) + + c.logger.Debugf("transaction status modifying directory %q: %+v", ephsPath.String(), txnResp) + + return err +} + +func (c *ephsLowLevelClientImpl) getProtoForPath(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error) { + var err error + etcd, err := c.clientForCell(path.Cell()) + if err != nil { + return nil, err + } + + obj, err := etcd.Get(ctx, path.EtcdKey()) + if err != nil { + return nil, err + } + + if len(obj.Kvs) < 1 { + return nil, status.Error( + codes.NotFound, + path.String()) + } + + entry := &ephs_proto.FsEntry{} + if err := proto.Unmarshal(obj.Kvs[0].Value, entry); err != nil { + return nil, status.Errorf( + codes.DataLoss, + "failed to unmarshal key %q as %s: %v", + obj.Kvs[0].Key, + proto.MessageName(entry), + err) + } + + return entry, nil +} + +func (c *ephsLowLevelClientImpl) clientForCell(cell string) (*etcd_client.Client, error) { + c.clientMu.Lock() + defer c.clientMu.Unlock() + + if _, ok := c.clients[cell]; !ok { + client, err := sd.NewEtcdClient( + c.id, + cell, + ) + if err != nil { + return nil, err + } + c.clients[cell] = client + } + + return c.clients[cell], nil +} + +func newDir(owner string) *ephs_proto.FsEntry { + return &ephs_proto.FsEntry{ + Content: &ephs_proto.FsEntry_Content{ + Content: &ephs_proto.FsEntry_Content_Directory{ + Directory: &ephs_proto.FsEntry_Directory{}, + }, + }, + Owner: owner, + Created: timestamppb.Now(), + Modified: timestamppb.Now(), + Version: 1, + } +} diff --git a/ephs/servicer/s3.go b/ephs/ephsll/s3.go similarity index 83% rename from ephs/servicer/s3.go rename to ephs/ephsll/s3.go index 32ce492..d526de6 100644 --- a/ephs/servicer/s3.go +++ b/ephs/ephsll/s3.go @@ -1,4 +1,4 @@ -package servicer +package ephsll import ( "errors" @@ -15,7 +15,7 @@ var s3Bucket = "ephs" var s3Prefix = "" func WithAWSEnvCredentials() Option { - return option.NewOption(func(s *ephsServicer) error { + return option.NewOption(func(s *ephsLowLevelClientImpl) error { s.s3Creds = credentials.NewEnvAWS() cc := &credentials.CredContext{ @@ -28,7 +28,7 @@ func WithAWSEnvCredentials() Option { } func WithAWSCredentialFile(filename string) Option { - return option.NewOption(func(s *ephsServicer) error { + return option.NewOption(func(s *ephsLowLevelClientImpl) error { s.s3Creds = credentials.NewFileAWSCredentials(filename, "default") cc := &credentials.CredContext{ @@ -39,7 +39,7 @@ func WithAWSCredentialFile(filename string) Option { }) } -func (s *ephsServicer) newS3Client() (*minio.Client, error) { +func (s *ephsLowLevelClientImpl) newS3Client() (*minio.Client, error) { if !flag.Parsed() { return nil, errors.New("flags not yet parsed") } diff --git a/ephs/path.go b/ephs/path.go new file mode 100644 index 0000000..3cf3058 --- /dev/null +++ b/ephs/path.go @@ -0,0 +1,148 @@ +package ephs + +import ( + "errors" + "flag" + "path" + "strings" + + "go.fuhry.dev/runtime/utils/hostname" +) + +type Path interface { + AclPath() string + Cell() string + EtcdKey() string + LiteralCell() string + String() string + Lineage() []Path + // Parent returns the parent directory of this path. + // + // If this path is the root path of a cell, + Parent() Path + Parents() []Path + Child(string) Path + Basename() string +} + +type ephsPath string + +const ( + localCell = "local" + cellIndex = 2 +) + +var ephsDefaultCell string + +var ErrNotEphsPath = errors.New("not an ephs path") + +func DefaultCell() string { + return ephsDefaultCell +} + +func IsEphsPath(p string) bool { + if !strings.HasPrefix(p, KeyPrefix) { + return false + } + + // "/ephs/cell.example.com/subdir" + // 0 1 2 3 + parts := strings.Split(p, PathSeparator) + if len(parts) < 3 { + return false + } + + // all path components must be non-empty except the first and last + for i := 1; i < len(parts)-1; i++ { + if parts[i] == "" { + return false + } + } + + // "." and ".." are not supported as path components + for _, c := range parts { + if c == "." || c == ".." { + return false + } + } + + return true +} + +func ParsePath(p string) (Path, error) { + if !IsEphsPath(p) { + return nil, ErrNotEphsPath + } + + return ephsPath(p), nil +} + +func (p ephsPath) AclPath() string { + parts := strings.Split(string(p), PathSeparator) + return strings.Join(parts[3:], PathSeparator) +} + +func (p ephsPath) Basename() string { + return path.Base(string(p)) +} + +func (p ephsPath) Cell() string { + cell := p.LiteralCell() + if cell == localCell { + return ephsDefaultCell + } + return cell +} + +func (p ephsPath) LiteralCell() string { + parts := strings.Split(string(p), PathSeparator) + return parts[cellIndex] +} + +func (p ephsPath) EtcdKey() string { + parts := strings.Split(string(p), PathSeparator) + if parts[cellIndex] == localCell { + parts[cellIndex] = ephsDefaultCell + } + + return strings.Join(parts, PathSeparator) +} + +func (p ephsPath) Parent() Path { + parts := strings.Split(string(p), PathSeparator) + if len(parts) == cellIndex+1 { + return p + } + + parent := strings.Join(parts[:len(parts)-1], PathSeparator) + return ephsPath(parent) +} + +func (p ephsPath) Parents() []Path { + var parents []Path + parts := strings.Split(string(p), PathSeparator) + for i := 3; i < len(parts); i++ { + parents = append(parents, ephsPath(strings.Join(parts[:i], PathSeparator))) + } + + return parents +} + +func (p ephsPath) Lineage() []Path { + lineage := p.Parents() + lineage = append(lineage, p) + return lineage +} + +func (p ephsPath) Child(name string) Path { + return ephsPath(string(p) + PathSeparator + name) +} + +func (p ephsPath) String() string { + return string(p) +} + +func init() { + flag.StringVar(&defaultClientAddr, "ephs.addr", "", "comma-separated list of server addresses for ephs; leave blank to use service discovery") + flag.StringVar(&ephsDefaultCell, "ephs.cell", hostname.DomainName(), "ephs cell") +} diff --git a/ephs/servicer/BUILD.bazel b/ephs/servicer/BUILD.bazel index cff5bc5..17213fb 100644 --- a/ephs/servicer/BUILD.bazel +++ b/ephs/servicer/BUILD.bazel @@ -4,36 +4,27 @@ go_library( name = "servicer", srcs = [ "acl.go", - "fs_object.go", - "s3.go", "servicer.go", "writer.go", ], importpath = "go.fuhry.dev/runtime/ephs/servicer", visibility = ["//visibility:public"], deps = [ - "//constants", "//ephs", + "//ephs/ephsll", "//grpc", "//mtls", "//mtls/fsnotify", "//proto/service/ephs", - "//sd", - "//utils/hostname", "//utils/log", "//utils/option", "//utils/stringmatch", - "@com_github_minio_minio_go_v7//:minio-go", - "@com_github_minio_minio_go_v7//pkg/credentials", "@in_gopkg_yaml_v3//:yaml_v3", - "@io_etcd_go_etcd_client_v3//:client", - "@io_etcd_go_etcd_client_v3//concurrency", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//peer", "@org_golang_google_grpc//status", "@org_golang_google_protobuf//proto", "@org_golang_google_protobuf//types/known/emptypb", - "@org_golang_google_protobuf//types/known/timestamppb", ], ) diff --git a/ephs/servicer/servicer.go b/ephs/servicer/servicer.go index 982504f..59eb0bf 100644 --- a/ephs/servicer/servicer.go +++ b/ephs/servicer/servicer.go @@ -1,35 +1,29 @@ +// Package servicer implements the gRPC servicer for ephs. +// +// The servicer implements all access control logic. +// +// Interaction with etcd takes place in the `go.fuhry.dev/runtime/ephs/ephsll` package. + package servicer import ( "bytes" "context" "errors" - "flag" - "fmt" "io" - "path" - "slices" - "strings" - "sync" - - "github.com/minio/minio-go/v7" - "github.com/minio/minio-go/v7/pkg/credentials" - etcd_client "go.etcd.io/etcd/client/v3" - etcd_concurrency "go.etcd.io/etcd/client/v3/concurrency" + "google.golang.org/grpc/codes" "google.golang.org/grpc/peer" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/emptypb" - "google.golang.org/protobuf/types/known/timestamppb" "go.fuhry.dev/runtime/ephs" + "go.fuhry.dev/runtime/ephs/ephsll" "go.fuhry.dev/runtime/grpc" "go.fuhry.dev/runtime/mtls" "go.fuhry.dev/runtime/mtls/fsnotify" ephs_proto "go.fuhry.dev/runtime/proto/service/ephs" - "go.fuhry.dev/runtime/sd" - "go.fuhry.dev/runtime/utils/hostname" "go.fuhry.dev/runtime/utils/log" "go.fuhry.dev/runtime/utils/option" ) @@ -43,12 +37,10 @@ type msgWithPath interface { type ephsServicer struct { ephs_proto.EphsServer - acl *Acl - logger log.Logger - clientMu sync.Mutex - clients map[string]*etcd_client.Client - s3Creds *credentials.Credentials - s3Client *minio.Client + acl *Acl + logger log.Logger + ll ephsll.EphsLowLevelClient + llOpts []ephsll.Option } var _ ephs_proto.EphsServer = &ephsServicer{} @@ -92,185 +84,83 @@ func WithAclFile(aclFile string) Option { }) } +func WithAWSEnvCredentials() Option { + return option.NewOption(func(s *ephsServicer) error { + s.llOpts = append(s.llOpts, ephsll.WithAWSEnvCredentials()) + return nil + }) +} + +func WithAWSCredentialFile(filename string) Option { + return option.NewOption(func(s *ephsServicer) error { + s.llOpts = append(s.llOpts, ephsll.WithAWSCredentialFile(filename)) + return nil + }) +} + func NewEphsServicer(opts ...Option) (ephs_proto.EphsServer, error) { serv := &ephsServicer{ - clients: make(map[string]*etcd_client.Client), - logger: log.Default().WithPrefix("ephsServicer"), - } - if _, err := serv.clientForCell(cell); err != nil { - return nil, err + logger: log.Default().WithPrefix("ephsServicer"), } + for _, o := range opts { if err := o.Apply(serv); err != nil { return nil, err } } - s3, err := serv.newS3Client() + ll, err := ephsll.NewEphsLowLevelClient(mtls.DefaultIdentity(), serv.llOpts...) if err != nil { return nil, err } - serv.s3Client = s3 + serv.ll = ll return serv, nil } -func (s *ephsServicer) clientForCell(cell string) (*etcd_client.Client, error) { - s.clientMu.Lock() - defer s.clientMu.Unlock() - - if _, ok := s.clients[cell]; !ok { - client, err := sd.NewEtcdClient( - mtls.DefaultIdentity(), - cell, - ) - if err != nil { - return nil, err - } - s.clients[cell] = client - } - - return s.clients[cell], nil -} - -func (s *ephsServicer) checkPath(ctx context.Context, path string) (string, error) { - if !ephs.IsEphsPath(path) { - return "", status.Errorf(codes.InvalidArgument, "path %q must start with "+ephs.KeyPrefix, path) - } - - parts := strings.Split(path, "/") - - if len(parts) < 3 { - return "", status.Errorf( +func (s *ephsServicer) checkPath(ctx context.Context, p string) (ephs.Path, error) { + ephsPath, err := ephs.ParsePath(p) + if err != nil { + return nil, status.Errorf( codes.InvalidArgument, - "path must be at least 3 levels deep: %q", - path) - } - - for i, part := range parts { - if (i > 0 && part == "") || part == "." || part == ".." { - return "", status.Errorf(codes.InvalidArgument, - "path %q is invalid: may not contain empty elements, '.' or '..'", path) - } + "failed to parse ephs path %q: %v", + p, err) } ident, err := identityFromContext(ctx) if err != nil { - return "", err - } - - aclPath := strings.Join(parts[3:], "/") - - if parts[2] == "local" { - parts[2] = cell + return nil, err } if s.acl != nil { - if !s.acl.Check(ident, aclPath) { - return "", status.Errorf( + if !s.acl.Check(ident, ephsPath.AclPath()) { + return nil, status.Errorf( codes.PermissionDenied, "access to path %q denied for %q (rule path: %q)", - strings.Join(parts, "/"), + ephsPath.String(), ident, - aclPath) + ephsPath.AclPath()) } } - return strings.Join(parts, "/"), nil + return ephsPath, nil } func (s *ephsServicer) Stat(ctx context.Context, req *ephs_proto.GetRequest) (*ephs_proto.StatResponse, error) { s.logRequest(ctx, "Stat", req) - entry, err := s.getPath(ctx, req.Path) + ephsPath, err := s.checkPath(ctx, req.Path) if err != nil { return nil, err } - return &ephs_proto.StatResponse{ - Entry: entry, - }, nil -} - -func (s *ephsServicer) getPath(ctx context.Context, path string) (*ephs_proto.FsEntry, error) { - var err error - if path, err = s.checkPath(ctx, path); err != nil { - return nil, err - } - - cell := strings.Split(path, "/")[2] - etcd, err := s.clientForCell(cell) - if err != nil { - return nil, err - } - - obj, err := etcd.Get(ctx, path) + entry, err := s.ll.Stat(ctx, ephsPath) if err != nil { return nil, err } - if len(obj.Kvs) < 1 { - return nil, status.Error( - codes.NotFound, - path) - } - - entry := &ephs_proto.FsEntry{} - if err := proto.Unmarshal(obj.Kvs[0].Value, entry); err != nil { - return nil, status.Errorf( - codes.DataLoss, - "failed to unmarshal key %q as %s: %v", - obj.Kvs[0].Key, - proto.MessageName(entry), - err) - } - - return entry, nil -} - -func (s *ephsServicer) modifyDirectory(ctx context.Context, path string, apply func(*ephs_proto.FsEntry_Directory, etcd_concurrency.STM) error) error { - var err error - if path, err = s.checkPath(ctx, path); err != nil { - return err - } - - cell := strings.Split(path, "/")[2] - etcd, err := s.clientForCell(cell) - if err != nil { - return err - } - - txnResp, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error { - dirRaw := []byte(stm.Get(path)) - dir := newDir(ctx) - if len(dirRaw) > 0 { - if err := proto.Unmarshal(dirRaw, dir); err != nil { - return fmt.Errorf("error unmarshaling FsEntry at %q: %v", path, err) - } - } - - if dir.Content.GetDirectory() == nil { - return fmt.Errorf("ephs path %q: not a directory", path) - } - - if err = apply(dir.Content.GetDirectory(), stm); err != nil { - return fmt.Errorf("error performing atomic directory modification: %v", err) - } - - dir.Modified = timestamppb.Now() - dir.Version += 1 - - if dirRaw, err = proto.Marshal(dir); err != nil { - return fmt.Errorf("error re-marshaling FsEntry at %q after calling apply: %v", path, err) - } - - stm.Put(path, string(dirRaw)) - - return nil - }) - - s.logger.Debugf("transaction status modifying directory %q: %+v", path, txnResp) - - return err + return &ephs_proto.StatResponse{ + Entry: entry, + }, nil } func (s *ephsServicer) logRequest(ctx context.Context, method string, req msgWithPath) { @@ -286,32 +176,31 @@ func (s *ephsServicer) Get(req *ephs_proto.GetRequest, server ephs_proto.Ephs_Ge ctx := server.Context() s.logRequest(ctx, "Get", req) - entry, err := s.getPath(ctx, req.Path) + ephsPath, err := s.checkPath(ctx, req.Path) if err != nil { return err } - if entry.Content.GetDirectory() != nil { - return status.Errorf( - codes.FailedPrecondition, - "%q: is a directory", - req.Path) + entry, reader, err := s.ll.Get(ctx, ephsPath) + if err != nil { + if errors.Is(err, ephsll.ErrDirectory) { + return status.Errorf( + codes.FailedPrecondition, + "%q: is a directory", + req.Path) + } + + return err } - entryCopy := *entry + entryCopy := proto.Clone(entry).(*ephs_proto.FsEntry) response := &ephs_proto.GetResponse{ - Entry: &entryCopy, + Entry: entryCopy, } response.Entry.Content = nil - ent := &FsEntry{entry} - reader, err := ent.GetContents(ctx, s.s3Client) - if err != nil { - return err - } - writer := &responseWriter{server: server, response: response} n, err := io.Copy(writer, reader) if err != nil { @@ -323,154 +212,25 @@ func (s *ephsServicer) Get(req *ephs_proto.GetRequest, server ephs_proto.Ephs_Ge return nil } -func (s *ephsServicer) mkdir(ctx context.Context, pathName string, recursive bool) error { - parts := strings.Split(pathName, "/") - - cell := parts[2] - etcd, err := s.clientForCell(cell) - if err != nil { - return err - } - - txn, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error { - var ( - dir *ephs_proto.FsEntry = newDir(ctx) - err error - ) - for i := 3; i < len(parts); i++ { - parent := strings.Join(parts[:i], "/") - - dirRaw := []byte(stm.Get(parent)) - if len(dirRaw) == 0 { - if !recursive { - return status.Errorf(codes.NotFound, "cannot mkdir %q: parent directory %q does not exist and recursion is disabled", pathName, parent) - } - - dir = newDir(ctx) - } else { - err = proto.Unmarshal(dirRaw, dir) - if err != nil { - return status.Errorf(codes.Internal, - "error unmarshaling parent directory %q: %T: %v", - parent, err, err) - } - - if dir.Content.GetDirectory() == nil { - return status.Errorf( - codes.FailedPrecondition, - "cannot mkdir %q: parent item %q is not a directory (raw len: %d)", - pathName, parent, len(dirRaw)) - } - - dir.Modified = timestamppb.Now() - dir.Version += 1 - } - - dEnt := dir.Content.GetDirectory() - - dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{ - Name: parts[i], - Directory: true, - }) - - dirRaw, err = proto.Marshal(dir) - if err != nil { - return status.Errorf(codes.Internal, - "error marshaling parent directory %q: %T: %v", - parent, err, err) - } - - stm.Put(parent, string(dirRaw)) - } - - entry := []byte(stm.Get(pathName)) - if len(entry) > 0 { - return status.Errorf(codes.AlreadyExists, - "cannot create directory %q: already exists", - pathName) - } - - dirRaw, err := proto.Marshal(newDir(ctx)) - if err != nil { - return status.Errorf(codes.Internal, - "error marshaling new directory %q: %T: %v", - pathName, err, err) - } - - stm.Put(pathName, string(dirRaw)) - return nil - }) - if err != nil { - return err - } - if !txn.Succeeded { - return fmt.Errorf("transaction failed: %+v", txn) - } - - return nil -} - func (s *ephsServicer) MkDir(ctx context.Context, req *ephs_proto.MkDirRequest) (*emptypb.Empty, error) { path, err := s.checkPath(ctx, req.Path) if err != nil { return nil, err } - return &emptypb.Empty{}, s.mkdir(ctx, path, req.Recursive) + whoami, _ := identityFromContext(ctx) + + return &emptypb.Empty{}, s.ll.Mkdir(ctx, path, whoami, req.Recursive) } func (s *ephsServicer) Delete(ctx context.Context, req *ephs_proto.DeleteRequest) (*emptypb.Empty, error) { s.logRequest(ctx, "Delete", req) - targetPath, err := s.checkPath(ctx, req.Path) - if err != nil { - return nil, err - } - - entry, err := s.getPath(ctx, targetPath) + ephsPath, err := s.checkPath(ctx, req.Path) if err != nil { return nil, err } - fse := &FsEntry{entry} - if fse.Content.GetDirectory() != nil { - if len(fse.Content.GetDirectory().Entries) > 0 && !req.Recursive { - return nil, status.Errorf(codes.FailedPrecondition, - "refusing to delete non-empty directory %q without recursion flag set", - targetPath) - } - - for _, ent := range fse.Content.GetDirectory().Entries { - subReq := &ephs_proto.DeleteRequest{ - Path: path.Join(targetPath, ent.Name), - Recursive: true, - } - _, err = s.Delete(ctx, subReq) - if err != nil { - return nil, err - } - } - } else { - if err := fse.DeleteContents(ctx, s.s3Client); err != nil { - return nil, err - } - } - - dir := path.Dir(targetPath) - err = s.modifyDirectory(ctx, dir, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error { - index := -1 - for i, ent := range dEnt.Entries { - if ent.Name == path.Base(targetPath) { - index = i - break - } - } - if index >= 0 { - dEnt.Entries = slices.Delete(dEnt.Entries, index, index+1) - } - stm.Del(targetPath) - return nil - }) - if err != nil { + if err := s.ll.Delete(ctx, ephsPath, req.Recursive); err != nil { return nil, err } @@ -491,170 +251,47 @@ func (s *ephsServicer) Put(server ephs_proto.Ephs_PutServer) error { return err } - targetPath, err := s.checkPath(ctx, msg.Path) - if err != nil { - return err - } - - dir := path.Dir(targetPath) - _, err = s.getPath(ctx, dir) + ephsPath, err := s.checkPath(ctx, msg.Path) if err != nil { - if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound { - return status.Errorf( - codes.FailedPrecondition, - "cannot write %q: parent directory %q does not exist", - targetPath, dir) - } return err } - pbEnt, err := s.getPath(ctx, targetPath) - if err != nil { - if status, ok := status.FromError(err); ok && status.Code() == codes.NotFound { - // NotFound is only returned after all access checks have succeeded. Ok to - // create file. - now := timestamppb.Now() - pbEnt = &ephs_proto.FsEntry{ - Created: now, - Modified: now, - Version: msg.Version - 1, - Size: msg.Size, - Owner: peerId, - } - } else { - // some other error besides not-found - return err - } - } else { - if msg.Version != 0 && msg.Version != (pbEnt.Version+1) { - return status.Errorf( - codes.FailedPrecondition, - "version conflict: attempted to overwrite version %d with %d, "+ - "version number must be current version + 1", - pbEnt.Version, - msg.Version) - } - } - ent := &FsEntry{pbEnt} - if pbEnt.Content.GetDirectory() != nil { - return status.Errorf(codes.FailedPrecondition, - "cannot write %q: path already exists and is a directory", - targetPath) - } - reader := &putReader{ initialMsg: msg, server: server, buf: &bytes.Buffer{}, } - if err := ent.SetContents(ctx, s.s3Client, reader, msg.Size); err != nil { - return err - } - - ent.FsEntry.Modified = timestamppb.Now() - ent.FsEntry.Size = msg.Size - ent.FsEntry.Version++ - - marshaled, err := proto.Marshal(ent.FsEntry) - if err != nil { - return err - } - - err = s.modifyDirectory(ctx, dir, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error { - stm.Put(targetPath, string(marshaled)) - base := path.Base(targetPath) - for _, ent := range dEnt.Entries { - if ent.Name == base { - return nil - } - } - - dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{ - Name: base, - Directory: false, - }) - - return nil - }) - - if err != nil { - _ = ent.DeleteContents(ctx, s.s3Client) - return err - } + ent, err := s.ll.Put(ctx, ephsPath, peerId, msg.Version, msg.Size, reader) response := &ephs_proto.StatResponse{ - Entry: ent.FsEntry, + Entry: ent, } return server.SendAndClose(response) } func (s *ephsServicer) Watch(req *ephs_proto.GetRequest, server ephs_proto.Ephs_WatchServer) error { - ctx := etcd_client.WithRequireLeader(server.Context()) + ctx := server.Context() s.logRequest(ctx, "Watch", req) - path, err := s.checkPath(ctx, req.Path) - if err != nil { - return err - } - - response := &ephs_proto.WatchResponse{ - Entry: nil, - Event: ephs_proto.WatchResponse_DELETE, - } - - if resp, err := s.getPath(ctx, path); err == nil { - response.Entry = resp - response.Event = ephs_proto.WatchResponse_CREATE - } - - cell := strings.Split(path, "/")[2] - etcd, err := s.clientForCell(cell) + ephsPath, err := s.checkPath(ctx, req.Path) if err != nil { return err } - if err := server.Send(response); err != nil { - return err - } - for ctx.Err() == nil { - watcher := etcd.Watch(ctx, path) - for msg := range watcher { - s.logger.Debugf("watcher: got event: %+v", msg) - for _, event := range msg.Events { - if string(event.Kv.Key) != path { - continue - } - - response.Entry = &ephs_proto.FsEntry{} - - if event.IsCreate() { - response.Event = ephs_proto.WatchResponse_CREATE - } else if event.IsModify() { - response.Event = ephs_proto.WatchResponse_MODIFY - } else if event.Type == etcd_client.EventTypeDelete { - response.Entry = nil - response.Event = ephs_proto.WatchResponse_DELETE - } else { - continue - } - - if response.Entry != nil { - if err = proto.Unmarshal(event.Kv.Value, response.Entry); err != nil { - s.logger.Errorf("protobuf marshal error: %v", err) - return err - } - } - - if err := server.Send(response); err != nil { - s.logger.Errorf("stream send error: %v", err) - return err - } + ch, err := s.ll.Watch(ctx, ephsPath) + if err != nil { + return err + } + + for msg := range ch { + if err := server.Send(msg); err != nil { + return err } } } - s.logger.Noticef("Watch(%q) ending: ctx err: %v", path, ctx.Err()) + s.logger.V(1).Debugf("Watch(%q) ending: ctx err: %v", ephsPath.String(), ctx.Err()) if errors.Is(ctx.Err(), context.Canceled) { return status.Error(codes.Unavailable, "server is shutting down, please reconnect") @@ -676,23 +313,3 @@ func identityFromContext(ctx context.Context) (string, error) { return ident.String(), nil } - -func newDir(ctx context.Context) *ephs_proto.FsEntry { - whoami, _ := identityFromContext(ctx) - - return &ephs_proto.FsEntry{ - Content: &ephs_proto.FsEntry_Content{ - Content: &ephs_proto.FsEntry_Content_Directory{ - Directory: &ephs_proto.FsEntry_Directory{}, - }, - }, - Owner: whoami, - Created: timestamppb.Now(), - Modified: timestamppb.Now(), - Version: 1, - } -} - -func init() { - flag.StringVar(&cell, "ephs.cell", hostname.DomainName(), "ephs cell") -} diff --git a/ephs/servicer/writer.go b/ephs/servicer/writer.go index 6cccc6c..4d3d6d7 100644 --- a/ephs/servicer/writer.go +++ b/ephs/servicer/writer.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" + "go.fuhry.dev/runtime/ephs/ephsll" ephs_proto "go.fuhry.dev/runtime/proto/service/ephs" "go.fuhry.dev/runtime/utils/log" ) @@ -15,7 +16,7 @@ type responseWriter struct { func (w *responseWriter) Write(p []byte) (int, error) { log.Default().V(3).Debugf("responseWriter: writing %d bytes", len(p)) - chunkLen := min(len(p), LargeFileThreshold) + chunkLen := min(len(p), ephsll.LargeFileThreshold) w.response.Chunk = make([]byte, chunkLen) copy(w.response.Chunk, p) diff --git a/grpc/internal/client/BUILD.bazel b/grpc/internal/client/BUILD.bazel index 767ab92..40a50e8 100644 --- a/grpc/internal/client/BUILD.bazel +++ b/grpc/internal/client/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//net/dns", "//sd", "//utils/context", + "//utils/log", "//utils/option", "@org_golang_google_grpc//:grpc", ], diff --git a/grpc/internal/client/client.go b/grpc/internal/client/client.go index 5551948..990e7c7 100644 --- a/grpc/internal/client/client.go +++ b/grpc/internal/client/client.go @@ -4,10 +4,13 @@ import ( "context" "fmt" "net" + "strconv" "go.fuhry.dev/runtime/grpc/internal/common" "go.fuhry.dev/runtime/mtls" + "go.fuhry.dev/runtime/net/dns" "go.fuhry.dev/runtime/sd" + "go.fuhry.dev/runtime/utils/log" "go.fuhry.dev/runtime/utils/option" "google.golang.org/grpc" ) @@ -27,6 +30,7 @@ type AddressProvider interface { type client struct { ctx context.Context + logger log.Logger serverId mtls.Identity clientId mtls.Identity watcher AddressProvider @@ -55,24 +59,48 @@ func WithAddressProvider(ap AddressProvider) ClientOption { }) } -func WithStaticAddress(addresses ...*net.TCPAddr) ClientOption { - var addrs []sd.ServiceAddress - - for _, addr := range addresses { - var ip4, ip6 string - if len(addr.IP) == 4 { - ip4 = addr.IP.String() - } else { - ip6 = addr.IP.String() - } - addrs = append(addrs, sd.ServiceAddress{ - IP4: ip4, - IP6: ip6, - Port: uint16(addr.Port), - }) - } - +func WithStaticAddress(addresses ...string) ClientOption { return option.NewOption(func(c *client) error { + var addrs []sd.ServiceAddress + + for i, addr := range addresses { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return fmt.Errorf("failed to parse host:port %q at index %d: %v", addr, i, err) + } + + portNumber, err := net.LookupPort("tcp", port) + if err != nil { + return fmt.Errorf("failed to lookup named port %q at index %d: %v", port, i, err) + } + + var ip4, ip6 string + if ip := net.ParseIP(host); ip != nil { + if len(ip) == 4 { + ip4 = ip.String() + } else { + ip6 = ip.String() + } + } else { + ip4, ip6, err = dns.ResolveDualStack(host) + if err != nil { + return fmt.Errorf("failed to lookup host %q at index %d: %v", host, i, err) + } + } + + if ip4 == "" && ip6 == "" { + return fmt.Errorf( + "failed to parse address %q at index %d: IPv4 and IPv6 addresses both empty", + addr, i) + } + + addrs = append(addrs, sd.ServiceAddress{ + Hostname: host, + IP4: ip4, + IP6: ip6, + Port: uint16(portNumber), + }) + } c.watcher = &staticAddressProvider{ addresses: addrs, } @@ -94,6 +122,7 @@ func WithDNSSRV() ClientOption { func NewGrpcClient(ctx context.Context, serverId string, clientId mtls.Identity, opts ...ClientOption) (Client, error) { cl := &client{ ctx: ctx, + logger: log.Default().WithPrefix(fmt.Sprintf("grpc.Client[%s]", serverId)), serverId: mtls.NewRemoteServiceIdentity(serverId), clientId: clientId, connFac: common.NewDefaultConnectionFactory(), @@ -145,7 +174,8 @@ func (c *client) Conn() (*grpc.ClientConn, error) { dialer, } - target := fmt.Sprintf("%s:%d", addrs[0].Hostname, addrs[0].Port) + target := net.JoinHostPort(addrs[0].Hostname, strconv.Itoa(int(addrs[0].Port))) + c.logger.V(1).Infof("establishing conn to %s", target) conn, err := grpc.DialContext(c.ctx, target, opts...) if err != nil { return nil, err