]> go.fuhry.dev Git - runtime.git/commitdiff
[ephs] split server into servicer and low-level client main
authorDan Fuhry <dan@fuhry.com>
Sat, 22 Nov 2025 03:01:18 +0000 (22:01 -0500)
committerDan Fuhry <dan@fuhry.com>
Sat, 22 Nov 2025 03:10:33 +0000 (22:10 -0500)
Make the ephs servicer act as a front-end to the ephs low-level client library, which handles all direct interaction with etcd.

- Move all ephs etcd interaction from `ephs/servicer` to `ephs/ephsll`
- Add ephs.Path type to make ephs path transformations simpler
- Refactor grpc's `WithStaticAddress` option to support DNS names

15 files changed:
cmd/ephs_client/main.go
cmd/ephs_server/main.go
cmd/grpc_health_probe/main.go
ephs/BUILD.bazel
ephs/client.go
ephs/ephsll/BUILD.bazel [new file with mode: 0644]
ephs/ephsll/fs_object.go [moved from ephs/servicer/fs_object.go with 73% similarity]
ephs/ephsll/low_level_client.go [new file with mode: 0644]
ephs/ephsll/s3.go [moved from ephs/servicer/s3.go with 83% similarity]
ephs/path.go [new file with mode: 0644]
ephs/servicer/BUILD.bazel
ephs/servicer/servicer.go
ephs/servicer/writer.go
grpc/internal/client/BUILD.bazel
grpc/internal/client/client.go

index da5aeaaee60398b1558da3174eb8db8cd170234b..93b4f6dc93144f4cbac8a882bb46422bd27ec0a2 100644 (file)
@@ -379,7 +379,10 @@ func main() {
                },
        }
 
-       if err := cmd.Run(ctx, os.Args); err != nil {
+       args := os.Args[:1]
+       args = append(args, flag.Args()...)
+
+       if err := cmd.Run(ctx, args); err != nil {
                log.Fatal(err)
                os.Exit(1)
        }
index 288696e1bd76801f51d78a9c8156a0be455c4c2e..79685cc0784cdda85fe3f25721707c80f3ef22fd 100644 (file)
@@ -2,6 +2,7 @@ package main
 
 import (
        "flag"
+       "os"
 
        "go.fuhry.dev/runtime/ephs/servicer"
        "go.fuhry.dev/runtime/grpc"
@@ -16,6 +17,8 @@ import (
 func main() {
        var err error
 
+       mtls.SetDefaultIdentity("ephs")
+
        acl := flag.String("ephs.acl", "", "YAML file containing ACLs for ephs")
        awsFile := flag.String("ephs.s3-creds-file", "", "file to load AWS S3 credentials from")
        awsEnv := flag.Bool("ephs.s3-creds-env", false, "set to true to load AWS credentials from environment")
@@ -25,7 +28,8 @@ func main() {
        serverIdentity := mtls.DefaultIdentity()
        s, err := grpc.NewGrpcServer(serverIdentity, grpc.WithTransport(&grpc.QUICConnectionFactory{}))
        if err != nil {
-               panic(err)
+               log.Fatal(err)
+               os.Exit(1)
        }
 
        var opts []servicer.Option
@@ -42,6 +46,7 @@ func main() {
        serv, err := servicer.NewEphsServicer(opts...)
        if err != nil {
                log.Fatal(err)
+               os.Exit(1)
        }
 
        ctx, cancel := context.Interruptible()
@@ -51,7 +56,8 @@ func main() {
                ephs_pb.RegisterEphsServer(s, serv)
        })
        if err != nil {
-               panic(err)
+               log.Fatal(err)
+               os.Exit(1)
        }
        defer s.Stop()
 
index 244d9ebc1207d80b8c389881e9cde29038eccbfc..be9dec05d674ed6b16f5eedde7901c2c8e99b33e 100644 (file)
@@ -2,9 +2,7 @@ package main
 
 import (
        "flag"
-       "net"
        "os"
-       "strconv"
        "time"
 
        "go.fuhry.dev/runtime/grpc"
@@ -33,24 +31,7 @@ func main() {
        var opts []grpc.ClientOption
 
        if *serverAddr != "" {
-               host, port, err := net.SplitHostPort(*serverAddr)
-               if err != nil {
-                       log.Default().Panic(err)
-               }
-
-               portInt, err := strconv.Atoi(port)
-               if err != nil {
-                       portInt, err = net.LookupPort("tcp", port)
-                       if err != nil {
-                               log.Default().Panic(err)
-                       }
-               }
-
-               if ip := net.ParseIP(host); ip == nil {
-                       log.Default().Panicf("%q: not a valid IPv4 or IPv6 address", host)
-               } else {
-                       opts = append(opts, grpc.WithStaticAddress(&net.TCPAddr{ip, portInt, ""}))
-               }
+               opts = append(opts, grpc.WithStaticAddress(*serverAddr))
        }
 
        ctx, cancel := context.Interruptible()
index 8c1413f4f9343d3b5806ad7cdbf5061cc73ad7a7..e4f36dd2ddb2c427cb83aad9b672a7e1590ae4d4 100644 (file)
@@ -2,7 +2,10 @@ load("@rules_go//go:def.bzl", "go_library")
 
 go_library(
     name = "ephs",
-    srcs = ["client.go"],
+    srcs = [
+        "client.go",
+        "path.go",
+    ],
     importpath = "go.fuhry.dev/runtime/ephs",
     visibility = ["//visibility:public"],
     deps = [
@@ -11,6 +14,7 @@ go_library(
         "//mtls",
         "//proto/service/ephs",
         "//utils/context",
+        "//utils/hostname",
         "//utils/log",
         "//utils/option",
         "@com_github_quic_go_quic_go//:quic-go",
index d4eac3b226d1f8d5d861bf12f471fbe2a523dcd9..5a1943db0b5aecb8850ba8d0eb550df7fbbfb238 100644 (file)
@@ -1,3 +1,13 @@
+// Package ephs is the public-facing, high-level client library for interacting with ephs.
+//
+// ephs (pronounced as "effs") is "ephemeral filesystem." It is a filesystem implemented on top of
+// etcd and S3-compatible object storage, with object storage being used for files above a certain
+// size threshold. Presently this is 256 KiB or one chunk.
+//
+// Secure deployment of ephs requires either a dedicated etcd instance or careful configuration of
+// etcd access such that reads and writes of keys beginning with `/ephs/` are restricted to the ephs
+// gRPC service.
+
 package ephs
 
 import (
@@ -23,6 +33,7 @@ import (
 )
 
 const KeyPrefix = "/ephs/"
+const PathSeparator = "/"
 const ChunkSize = 262144
 
 const formatEntryDateFormat = "Monday, 2 Jan 2006 15:04:05 -0700"
@@ -55,8 +66,9 @@ type clientImpl struct {
        defaultTimeout time.Duration
        id             mtls.Identity
 
-       client grpc.Client
-       conn   *grpc.ClientConn
+       grpcOpts []grpc.ClientOption
+       client   grpc.Client
+       conn     *grpc.ClientConn
 }
 
 type getReader struct {
@@ -74,13 +86,11 @@ var ephsQuicConfig = &quic.Config{
        KeepAlivePeriod:      5 * time.Second,
 }
 
+var defaultClientAddr string
+
 var defaultClient Client
 var defaultClientMu sync.Mutex
 
-func IsEphsPath(p string) bool {
-       return strings.HasPrefix(p, KeyPrefix)
-}
-
 type notFoundError struct {
        ephsPath string
 }
@@ -162,6 +172,13 @@ func WithDefaultTimeout(d time.Duration) ClientOption {
        })
 }
 
+func withGrpcClientOptions(opts ...grpc.ClientOption) ClientOption {
+       return option.NewOption(func(c *clientImpl) error {
+               c.grpcOpts = append(c.grpcOpts, opts...)
+               return nil
+       })
+}
+
 func DefaultClient() (Client, error) {
        defaultClientMu.Lock()
        defer defaultClientMu.Unlock()
@@ -174,7 +191,13 @@ func DefaultClient() (Client, error) {
                DefaultClientContext, _ = context.Interruptible()
        }
 
-       client, err := NewClient(DefaultClientContext, mtls.DefaultIdentity())
+       var opts []ClientOption
+       if defaultClientAddr != "" {
+               addrs := strings.Split(defaultClientAddr, ",")
+               opts = append(opts, withGrpcClientOptions(grpc.WithStaticAddress(addrs...)))
+       }
+
+       client, err := NewClient(DefaultClientContext, mtls.DefaultIdentity(), opts...)
        if err != nil {
                return nil, err
        }
@@ -188,6 +211,11 @@ func NewClient(ctx context.Context, localId mtls.Identity, opts ...ClientOption)
                defaultCtx:     ctx,
                defaultTimeout: 15 * time.Second,
                id:             localId,
+               grpcOpts: []grpc.ClientOption{
+                       grpc.WithConnectionFactory(&grpc_common.QUICConnectionFactory{
+                               QUICConfig: ephsQuicConfig.Clone(),
+                       }),
+               },
        }
 
        for _, opt := range opts {
@@ -447,10 +475,7 @@ func (c *clientImpl) grpcClient() (ephs_pb.EphsClient, error) {
        var err error
 
        if c.client == nil {
-               c.client, err = grpc.NewGrpcClient(c.defaultCtx, "ephs", c.id,
-                       grpc.WithConnectionFactory(&grpc_common.QUICConnectionFactory{
-                               QUICConfig: ephsQuicConfig.Clone(),
-                       }))
+               c.client, err = grpc.NewGrpcClient(c.defaultCtx, "ephs", c.id, c.grpcOpts...)
                if err != nil {
                        return nil, fmt.Errorf("error creating grpc client: %T: %v", err, err)
                }
diff --git a/ephs/ephsll/BUILD.bazel b/ephs/ephsll/BUILD.bazel
new file mode 100644 (file)
index 0000000..8601f28
--- /dev/null
@@ -0,0 +1,29 @@
+load("@rules_go//go:def.bzl", "go_library")
+
+go_library(
+    name = "ephsll",
+    srcs = [
+        "fs_object.go",
+        "low_level_client.go",
+        "s3.go",
+    ],
+    importpath = "go.fuhry.dev/runtime/ephs/ephsll",
+    visibility = ["//visibility:public"],
+    deps = [
+        "//constants",
+        "//ephs",
+        "//mtls",
+        "//proto/service/ephs",
+        "//sd",
+        "//utils/log",
+        "//utils/option",
+        "@com_github_minio_minio_go_v7//:minio-go",
+        "@com_github_minio_minio_go_v7//pkg/credentials",
+        "@io_etcd_go_etcd_client_v3//:client",
+        "@io_etcd_go_etcd_client_v3//concurrency",
+        "@org_golang_google_grpc//codes",
+        "@org_golang_google_grpc//status",
+        "@org_golang_google_protobuf//proto",
+        "@org_golang_google_protobuf//types/known/timestamppb",
+    ],
+)
similarity index 73%
rename from ephs/servicer/fs_object.go
rename to ephs/ephsll/fs_object.go
index 94df6c2f5ea9975c182a4b7f0c5be0638c8aebd5..92ef2d95b78defc0126fe3f89f871b6c87e485b1 100644 (file)
@@ -1,4 +1,4 @@
-package servicer
+package ephsll
 
 import (
        "bytes"
@@ -19,8 +19,9 @@ import (
 
 const LargeFileThreshold = ephs.ChunkSize
 
-type FsEntry struct {
+type fsEntryWrapper struct {
        *ephs_proto.FsEntry
+       s3Client *minio.Client
 }
 
 var ErrDirectory = errors.New("is a directory")
@@ -28,20 +29,24 @@ var ErrUnknownCompression = errors.New("file compressed with unknown scheme")
 
 var RegexpLargeFileKey = regexp.MustCompile(`^[0-9A-Za-z_-]{64}$`)
 
-func (e *FsEntry) GetContents(ctx context.Context, s3 *minio.Client) (io.Reader, error) {
+func (e *fsEntryWrapper) Proto() *ephs_proto.FsEntry {
+       return e.FsEntry
+}
+
+func (e *fsEntryWrapper) GetContents(ctx context.Context) (io.Reader, error) {
        switch {
        case e.FsEntry.Content.GetDirectory() != nil:
                return nil, ErrDirectory
        case e.FsEntry.Content.GetFile() != nil:
                return e.readFile()
        case e.FsEntry.Content.GetLargeFile() != nil:
-               return e.readLargeFile(ctx, s3)
+               return e.readLargeFile(ctx)
        default:
                return nil, errors.New("file content not populated with any known type")
        }
 }
 
-func (e *FsEntry) SetContents(ctx context.Context, s3 *minio.Client, r io.Reader, size uint64) error {
+func (e *fsEntryWrapper) SetContents(ctx context.Context, r io.Reader, size uint64) error {
        if e.FsEntry.Content.GetDirectory() != nil {
                return ErrDirectory
        }
@@ -54,7 +59,7 @@ func (e *FsEntry) SetContents(ctx context.Context, s3 *minio.Client, r io.Reader
 
        var err error
        if size >= LargeFileThreshold {
-               err = e.writeLargeFile(ctx, s3, r, size)
+               err = e.writeLargeFile(ctx, r, size)
        } else {
                err = e.writeFile(r, size)
        }
@@ -65,7 +70,7 @@ func (e *FsEntry) SetContents(ctx context.Context, s3 *minio.Client, r io.Reader
        if mustCleanupLargeFile {
                log.Default().Infof("mustCleanupLargeFile: %+v", oldLargeFile)
 
-               err = e.cleanupOldLargeFile(ctx, s3, &oldLargeFile)
+               err = e.cleanupOldLargeFile(ctx, &oldLargeFile)
                if err != nil {
                        log.Default().Alert(
                                "error while cleaning up old large file with key %q: %v",
@@ -77,7 +82,7 @@ func (e *FsEntry) SetContents(ctx context.Context, s3 *minio.Client, r io.Reader
        return nil
 }
 
-func (e *FsEntry) DeleteContents(ctx context.Context, s3 *minio.Client) error {
+func (e *fsEntryWrapper) DeleteContents(ctx context.Context) error {
        switch {
        case e.FsEntry.Content.GetDirectory() != nil:
                return ErrDirectory
@@ -85,13 +90,13 @@ func (e *FsEntry) DeleteContents(ctx context.Context, s3 *minio.Client) error {
                e.FsEntry.Content = nil
                return nil
        case e.FsEntry.Content.GetLargeFile() != nil:
-               return e.cleanupOldLargeFile(ctx, s3, e.Content.GetLargeFile())
+               return e.cleanupOldLargeFile(ctx, e.Content.GetLargeFile())
        default:
                return errors.New("file content not populated with any known type")
        }
 }
 
-func (e *FsEntry) readFile() (io.Reader, error) {
+func (e *fsEntryWrapper) readFile() (io.Reader, error) {
        file := e.FsEntry.Content.GetFile()
        if file == nil {
                return nil, errors.New("internal error: readFile() called but file is unpopulated")
@@ -107,7 +112,11 @@ func (e *FsEntry) readFile() (io.Reader, error) {
        }
 }
 
-func (e *FsEntry) readLargeFile(ctx context.Context, s3 *minio.Client) (io.Reader, error) {
+func (e *fsEntryWrapper) readLargeFile(ctx context.Context) (io.Reader, error) {
+       if e.s3Client == nil {
+               return nil, errors.New("s3 client not initialized, unable to read large file")
+       }
+
        if e.Content == nil || e.Content.GetLargeFile() == nil {
                return nil, errors.New("not a large file")
        }
@@ -117,10 +126,10 @@ func (e *FsEntry) readLargeFile(ctx context.Context, s3 *minio.Client) (io.Reade
                return nil, errors.New("large file key contains invalid characters or is the wrong length")
        }
 
-       return s3.GetObject(ctx, s3Bucket, s3Prefix+key, minio.GetObjectOptions{})
+       return e.s3Client.GetObject(ctx, s3Bucket, s3Prefix+key, minio.GetObjectOptions{})
 }
 
-func (e *FsEntry) writeFile(r io.Reader, size uint64) error {
+func (e *fsEntryWrapper) writeFile(r io.Reader, size uint64) error {
        var compression = ephs_proto.FsEntry_File_GZIP
 
        if f := e.FsEntry.Content.GetFile(); f != nil {
@@ -162,11 +171,15 @@ func (e *FsEntry) writeFile(r io.Reader, size uint64) error {
        return nil
 }
 
-func (e *FsEntry) writeLargeFile(ctx context.Context, s3 *minio.Client, r io.Reader, size uint64) error {
+func (e *fsEntryWrapper) writeLargeFile(ctx context.Context, r io.Reader, size uint64) error {
+       if e.s3Client == nil {
+               return errors.New("s3 client not initialized")
+       }
+
        key := newLargeFileKey()
 
        log.Default().Noticef("attempting to PutObject into bucket %s at key %s", s3Bucket, key)
-       upload, err := s3.PutObject(ctx, s3Bucket, s3Prefix+key, r, int64(size), minio.PutObjectOptions{})
+       upload, err := e.s3Client.PutObject(ctx, s3Bucket, s3Prefix+key, r, int64(size), minio.PutObjectOptions{})
        if err != nil {
                log.Default().Error(err)
                return err
@@ -185,13 +198,16 @@ func (e *FsEntry) writeLargeFile(ctx context.Context, s3 *minio.Client, r io.Rea
        return nil
 }
 
-func (e *FsEntry) cleanupOldLargeFile(ctx context.Context, s3 *minio.Client, lf *ephs_proto.FsEntry_LargeFile) error {
+func (e *fsEntryWrapper) cleanupOldLargeFile(ctx context.Context, lf *ephs_proto.FsEntry_LargeFile) error {
+       if e.s3Client == nil {
+               return errors.New("s3 client not initialized")
+       }
        key := lf.GetKey()
        if !RegexpLargeFileKey.MatchString(key) {
                return errors.New("large file key contains invalid characters or is the wrong length")
        }
 
-       return s3.RemoveObject(ctx, s3Bucket, s3Prefix+key, minio.RemoveObjectOptions{})
+       return e.s3Client.RemoveObject(ctx, s3Bucket, s3Prefix+key, minio.RemoveObjectOptions{})
 }
 
 func newLargeFileKey() string {
diff --git a/ephs/ephsll/low_level_client.go b/ephs/ephsll/low_level_client.go
new file mode 100644 (file)
index 0000000..8fd085f
--- /dev/null
@@ -0,0 +1,504 @@
+// Package ephsll is the low-level client for ephs.
+//
+// The low-level client handles all direct interaction with etcd.
+
+package ephsll
+
+import (
+       "context"
+       "fmt"
+       "io"
+       "slices"
+       "sync"
+
+       "github.com/minio/minio-go/v7"
+       "github.com/minio/minio-go/v7/pkg/credentials"
+       etcd_client "go.etcd.io/etcd/client/v3"
+       etcd_concurrency "go.etcd.io/etcd/client/v3/concurrency"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       "go.fuhry.dev/runtime/ephs"
+       "go.fuhry.dev/runtime/mtls"
+       ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
+       "go.fuhry.dev/runtime/sd"
+       "go.fuhry.dev/runtime/utils/log"
+       "go.fuhry.dev/runtime/utils/option"
+)
+
+type Option = option.Option[*ephsLowLevelClientImpl]
+
+type EphsLowLevelClient interface {
+       Stat(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error)
+       Get(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, io.Reader, error)
+       Mkdir(ctx context.Context, path ephs.Path, owner string, recursive bool) error
+       Delete(ctx context.Context, path ephs.Path, recursive bool) error
+       Put(ctx context.Context, path ephs.Path, owner string, version, size uint64, contents io.Reader) (*ephs_proto.FsEntry, error)
+       Watch(ctx context.Context, path ephs.Path) (<-chan *ephs_proto.WatchResponse, error)
+}
+
+type ephsLowLevelClientImpl struct {
+       id       mtls.Identity
+       logger   log.Logger
+       clientMu sync.Mutex
+       clients  map[string]*etcd_client.Client
+       s3Creds  *credentials.Credentials
+       s3Client *minio.Client
+}
+
+var _ EphsLowLevelClient = &ephsLowLevelClientImpl{}
+
+func NewEphsLowLevelClient(id mtls.Identity, opts ...Option) (EphsLowLevelClient, error) {
+       ll := &ephsLowLevelClientImpl{
+               id:      id,
+               logger:  log.WithPrefix("EphsLowLevelClient"),
+               clients: make(map[string]*etcd_client.Client),
+       }
+
+       for _, opt := range opts {
+               if err := opt.Apply(ll); err != nil {
+                       return nil, err
+               }
+       }
+
+       if _, err := ll.clientForCell(ephs.DefaultCell()); err != nil {
+               return nil, err
+       }
+
+       s3, err := ll.newS3Client()
+       if err != nil {
+               return nil, err
+       }
+       ll.s3Client = s3
+
+       return ll, nil
+}
+
+// Stat
+func (c *ephsLowLevelClientImpl) Stat(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error) {
+       ent, err := c.getProtoForPath(ctx, path)
+       if err != nil {
+               return nil, err
+       }
+
+       // Never send file contents in a stat response
+       if _, ok := ent.Content.GetContent().(*ephs_proto.FsEntry_Content_File); ok {
+               ent.Content = &ephs_proto.FsEntry_Content{
+                       Content: &ephs_proto.FsEntry_Content_File{
+                               File: &ephs_proto.FsEntry_File{},
+                       },
+               }
+       }
+
+       return ent, nil
+}
+
+// Get
+func (c *ephsLowLevelClientImpl) Get(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, io.Reader, error) {
+       ent, err := c.getProtoForPath(ctx, path)
+       if err != nil {
+               return nil, nil, err
+       }
+
+       if ent.Content.GetDirectory() != nil {
+               return nil, nil, ErrDirectory
+       }
+
+       reader, err := (&fsEntryWrapper{FsEntry: ent, s3Client: c.s3Client}).GetContents(ctx)
+       if err != nil {
+               return nil, nil, err
+       }
+
+       return ent, reader, nil
+}
+
+// Mkdir
+func (c *ephsLowLevelClientImpl) Mkdir(ctx context.Context, path ephs.Path, owner string, recursive bool) error {
+       return c.mkdir(ctx, path, owner, recursive)
+}
+
+// Delete
+func (c *ephsLowLevelClientImpl) Delete(ctx context.Context, ephsPath ephs.Path, recursive bool) error {
+       entry, err := c.getProtoForPath(ctx, ephsPath)
+       if err != nil {
+               return err
+       }
+
+       fse := &fsEntryWrapper{FsEntry: entry, s3Client: c.s3Client}
+       if fse.Content.GetDirectory() != nil {
+               if len(fse.Content.GetDirectory().Entries) > 0 && !recursive {
+                       return status.Errorf(codes.FailedPrecondition,
+                               "refusing to delete non-empty directory %q without recursion flag set",
+                               ephsPath.String())
+               }
+
+               for _, ent := range fse.Content.GetDirectory().Entries {
+                       if err := c.Delete(ctx, ephsPath.Child(ent.Name), recursive); err != nil {
+                               return err
+                       }
+               }
+       } else {
+               if err := fse.DeleteContents(ctx); err != nil {
+                       return err
+               }
+       }
+
+       err = c.modifyDirectory(ctx, ephsPath.Parent(), entry.Owner, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error {
+               index := -1
+               for i, ent := range dEnt.Entries {
+                       if ent.Name == ephsPath.Basename() {
+                               index = i
+                               break
+                       }
+               }
+               if index >= 0 {
+                       dEnt.Entries = slices.Delete(dEnt.Entries, index, index+1)
+               }
+               stm.Del(ephsPath.EtcdKey())
+               return nil
+       })
+
+       return err
+}
+
+// Put
+func (c *ephsLowLevelClientImpl) Put(ctx context.Context, ephsPath ephs.Path, owner string, version, size uint64, contents io.Reader) (*ephs_proto.FsEntry, error) {
+       _, err := c.getProtoForPath(ctx, ephsPath.Parent())
+       if err != nil {
+               if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
+                       return nil, status.Errorf(
+                               codes.FailedPrecondition,
+                               "cannot write %q: parent directory %q does not exist",
+                               ephsPath.String(), ephsPath.Parent().String())
+               }
+               return nil, err
+       }
+
+       pbEnt, err := c.getProtoForPath(ctx, ephsPath)
+       if err != nil {
+               if status, ok := status.FromError(err); ok && status.Code() == codes.NotFound {
+                       // NotFound is only returned after all access checks have succeeded. Ok to
+                       // create file.
+                       now := timestamppb.Now()
+                       pbEnt = &ephs_proto.FsEntry{
+                               Created:  now,
+                               Modified: now,
+                               Version:  version - 1,
+                               Size:     size,
+                               Owner:    owner,
+                       }
+               } else {
+                       // some other error besides not-found
+                       return nil, err
+               }
+       } else {
+               if version != 0 && version != (pbEnt.Version+1) {
+                       return nil, status.Errorf(
+                               codes.FailedPrecondition,
+                               "version conflict: attempted to overwrite version %d with %d, "+
+                                       "version number must be current version + 1",
+                               pbEnt.Version,
+                               version)
+               }
+       }
+       ent := &fsEntryWrapper{FsEntry: pbEnt, s3Client: c.s3Client}
+       if pbEnt.Content.GetDirectory() != nil {
+               return nil, status.Errorf(codes.FailedPrecondition,
+                       "cannot write %q: path already exists and is a directory",
+                       ephsPath.String())
+       }
+
+       if err := ent.SetContents(ctx, contents, size); err != nil {
+               return nil, err
+       }
+
+       ent.FsEntry.Modified = timestamppb.Now()
+       ent.FsEntry.Size = size
+       ent.FsEntry.Version++
+
+       marshaled, err := proto.Marshal(ent.FsEntry)
+       if err != nil {
+               return nil, err
+       }
+
+       err = c.modifyDirectory(ctx, ephsPath.Parent(), owner, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error {
+               stm.Put(ephsPath.EtcdKey(), string(marshaled))
+               for _, ent := range dEnt.Entries {
+                       if ent.Name == ephsPath.Basename() {
+                               return nil
+                       }
+               }
+
+               dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{
+                       Name:      ephsPath.Basename(),
+                       Directory: false,
+               })
+
+               return nil
+       })
+
+       if err != nil {
+               _ = ent.DeleteContents(ctx)
+               return nil, err
+       }
+
+       return ent.FsEntry, nil
+}
+
+// Watch
+func (c *ephsLowLevelClientImpl) Watch(ctx context.Context, ephsPath ephs.Path) (<-chan *ephs_proto.WatchResponse, error) {
+       ctx = etcd_client.WithRequireLeader(ctx)
+       resp := &ephs_proto.WatchResponse{
+               Entry: nil,
+               Event: ephs_proto.WatchResponse_DELETE,
+       }
+
+       if fse, err := c.getProtoForPath(ctx, ephsPath); err == nil {
+               resp.Entry = fse
+               resp.Event = ephs_proto.WatchResponse_CREATE
+       }
+
+       etcd, err := c.clientForCell(ephsPath.Cell())
+       if err != nil {
+               return nil, err
+       }
+
+       ch := make(chan *ephs_proto.WatchResponse)
+
+       go c.watch(ctx, ephsPath, ch, resp, etcd)
+
+       return ch, nil
+}
+
+func (c *ephsLowLevelClientImpl) watch(ctx context.Context, ephsPath ephs.Path, ch chan *ephs_proto.WatchResponse, resp *ephs_proto.WatchResponse, etcd *etcd_client.Client) {
+       defer close(ch)
+
+       ch <- resp
+
+       for ctx.Err() == nil {
+               watcher := etcd.Watch(ctx, ephsPath.EtcdKey())
+               for msg := range watcher {
+                       c.logger.Debugf("watcher: got event: %+v", msg)
+                       for _, event := range msg.Events {
+                               if string(event.Kv.Key) != ephsPath.EtcdKey() {
+                                       continue
+                               }
+
+                               resp.Entry = &ephs_proto.FsEntry{}
+
+                               if event.IsCreate() {
+                                       resp.Event = ephs_proto.WatchResponse_CREATE
+                               } else if event.IsModify() {
+                                       resp.Event = ephs_proto.WatchResponse_MODIFY
+                               } else if event.Type == etcd_client.EventTypeDelete {
+                                       resp.Entry = nil
+                                       resp.Event = ephs_proto.WatchResponse_DELETE
+                               } else {
+                                       continue
+                               }
+
+                               if resp.Entry != nil {
+                                       if err := proto.Unmarshal(event.Kv.Value, resp.Entry); err != nil {
+                                               c.logger.Errorf("protobuf unmarshal error: %v", err)
+                                               return
+                                       }
+                               }
+
+                               ch <- resp
+                       }
+               }
+       }
+}
+
+func (c *ephsLowLevelClientImpl) mkdir(ctx context.Context, ephsPath ephs.Path, owner string, recursive bool) error {
+       etcd, err := c.clientForCell(ephsPath.Cell())
+       if err != nil {
+               return err
+       }
+
+       txn, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error {
+               var (
+                       dir *ephs_proto.FsEntry = newDir(owner)
+                       err error
+               )
+               for _, ancestor := range ephsPath.Lineage() {
+                       parent := ancestor.Parent()
+                       if ancestor == ancestor.Parent() {
+                               continue
+                       }
+                       dirRaw := []byte(stm.Get(parent.EtcdKey()))
+                       if len(dirRaw) == 0 {
+                               if !recursive {
+                                       return status.Errorf(codes.NotFound,
+                                               "cannot mkdir %q: parent directory %q does not exist and recursion is disabled",
+                                               ephsPath.String(), parent)
+                               }
+
+                               dir = newDir(owner)
+                       } else {
+                               err = proto.Unmarshal(dirRaw, dir)
+                               if err != nil {
+                                       return status.Errorf(codes.Internal,
+                                               "error unmarshaling parent directory %q: %T: %v",
+                                               parent, err, err)
+                               }
+
+                               if dir.Content.GetDirectory() == nil {
+                                       return status.Errorf(
+                                               codes.FailedPrecondition,
+                                               "cannot mkdir %q: parent item %q is not a directory (raw len: %d)",
+                                               ephsPath.String(), parent, len(dirRaw))
+                               }
+
+                               dir.Modified = timestamppb.Now()
+                               dir.Version += 1
+                       }
+
+                       dEnt := dir.Content.GetDirectory()
+
+                       dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{
+                               Name:      ancestor.Basename(),
+                               Directory: true,
+                       })
+
+                       dirRaw, err = proto.Marshal(dir)
+                       if err != nil {
+                               return status.Errorf(codes.Internal,
+                                       "error marshaling parent directory %q: %T: %v",
+                                       parent, err, err)
+                       }
+
+                       stm.Put(parent.EtcdKey(), string(dirRaw))
+               }
+
+               entry := []byte(stm.Get(ephsPath.EtcdKey()))
+               if len(entry) > 0 {
+                       return status.Errorf(codes.AlreadyExists,
+                               "cannot create directory %q: already exists",
+                               ephsPath.String())
+               }
+
+               dirRaw, err := proto.Marshal(newDir(owner))
+               if err != nil {
+                       return status.Errorf(codes.Internal,
+                               "error marshaling new directory %q: %T: %v",
+                               ephsPath.String(), err, err)
+               }
+
+               stm.Put(ephsPath.EtcdKey(), string(dirRaw))
+               return nil
+       }, etcd_concurrency.WithAbortContext(ctx))
+       if err != nil {
+               return err
+       }
+       if !txn.Succeeded {
+               return fmt.Errorf("transaction failed: %+v", txn)
+       }
+
+       return nil
+}
+
+func (c *ephsLowLevelClientImpl) modifyDirectory(ctx context.Context, ephsPath ephs.Path, owner string, apply func(*ephs_proto.FsEntry_Directory, etcd_concurrency.STM) error) (err error) {
+       etcd, err := c.clientForCell(ephsPath.Cell())
+       if err != nil {
+               return err
+       }
+
+       txnResp, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error {
+               dirRaw := []byte(stm.Get(ephsPath.EtcdKey()))
+               dir := newDir(owner)
+               if len(dirRaw) > 0 {
+                       if err := proto.Unmarshal(dirRaw, dir); err != nil {
+                               return fmt.Errorf("error unmarshaling FsEntry at %q: %v", ephsPath.String(), err)
+                       }
+               }
+
+               if dir.Content.GetDirectory() == nil {
+                       return fmt.Errorf("ephs path %q: not a directory", ephsPath.String())
+               }
+
+               if err = apply(dir.Content.GetDirectory(), stm); err != nil {
+                       return fmt.Errorf("error performing atomic directory modification: %v", err)
+               }
+
+               dir.Modified = timestamppb.Now()
+               dir.Version += 1
+
+               if dirRaw, err = proto.Marshal(dir); err != nil {
+                       return fmt.Errorf("error re-marshaling FsEntry at %q after calling apply: %v", ephsPath.String(), err)
+               }
+
+               stm.Put(ephsPath.EtcdKey(), string(dirRaw))
+
+               return nil
+       }, etcd_concurrency.WithAbortContext(ctx))
+
+       c.logger.Debugf("transaction status modifying directory %q: %+v", ephsPath.String(), txnResp)
+
+       return err
+}
+
+func (c *ephsLowLevelClientImpl) getProtoForPath(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error) {
+       var err error
+       etcd, err := c.clientForCell(path.Cell())
+       if err != nil {
+               return nil, err
+       }
+
+       obj, err := etcd.Get(ctx, path.EtcdKey())
+       if err != nil {
+               return nil, err
+       }
+
+       if len(obj.Kvs) < 1 {
+               return nil, status.Error(
+                       codes.NotFound,
+                       path.String())
+       }
+
+       entry := &ephs_proto.FsEntry{}
+       if err := proto.Unmarshal(obj.Kvs[0].Value, entry); err != nil {
+               return nil, status.Errorf(
+                       codes.DataLoss,
+                       "failed to unmarshal key %q as %s: %v",
+                       obj.Kvs[0].Key,
+                       proto.MessageName(entry),
+                       err)
+       }
+
+       return entry, nil
+}
+
+func (c *ephsLowLevelClientImpl) clientForCell(cell string) (*etcd_client.Client, error) {
+       c.clientMu.Lock()
+       defer c.clientMu.Unlock()
+
+       if _, ok := c.clients[cell]; !ok {
+               client, err := sd.NewEtcdClient(
+                       c.id,
+                       cell,
+               )
+               if err != nil {
+                       return nil, err
+               }
+               c.clients[cell] = client
+       }
+
+       return c.clients[cell], nil
+}
+
+func newDir(owner string) *ephs_proto.FsEntry {
+       return &ephs_proto.FsEntry{
+               Content: &ephs_proto.FsEntry_Content{
+                       Content: &ephs_proto.FsEntry_Content_Directory{
+                               Directory: &ephs_proto.FsEntry_Directory{},
+                       },
+               },
+               Owner:    owner,
+               Created:  timestamppb.Now(),
+               Modified: timestamppb.Now(),
+               Version:  1,
+       }
+}
similarity index 83%
rename from ephs/servicer/s3.go
rename to ephs/ephsll/s3.go
index 32ce492291d4adc6a8474cfbe62aa55dcb7882a4..d526de6b37f3aa700968d22d32ace3b4e415397a 100644 (file)
@@ -1,4 +1,4 @@
-package servicer
+package ephsll
 
 import (
        "errors"
@@ -15,7 +15,7 @@ var s3Bucket = "ephs"
 var s3Prefix = ""
 
 func WithAWSEnvCredentials() Option {
-       return option.NewOption(func(s *ephsServicer) error {
+       return option.NewOption(func(s *ephsLowLevelClientImpl) error {
                s.s3Creds = credentials.NewEnvAWS()
 
                cc := &credentials.CredContext{
@@ -28,7 +28,7 @@ func WithAWSEnvCredentials() Option {
 }
 
 func WithAWSCredentialFile(filename string) Option {
-       return option.NewOption(func(s *ephsServicer) error {
+       return option.NewOption(func(s *ephsLowLevelClientImpl) error {
                s.s3Creds = credentials.NewFileAWSCredentials(filename, "default")
 
                cc := &credentials.CredContext{
@@ -39,7 +39,7 @@ func WithAWSCredentialFile(filename string) Option {
        })
 }
 
-func (s *ephsServicer) newS3Client() (*minio.Client, error) {
+func (s *ephsLowLevelClientImpl) newS3Client() (*minio.Client, error) {
        if !flag.Parsed() {
                return nil, errors.New("flags not yet parsed")
        }
diff --git a/ephs/path.go b/ephs/path.go
new file mode 100644 (file)
index 0000000..3cf3058
--- /dev/null
@@ -0,0 +1,148 @@
+package ephs
+
+import (
+       "errors"
+       "flag"
+       "path"
+       "strings"
+
+       "go.fuhry.dev/runtime/utils/hostname"
+)
+
+type Path interface {
+       AclPath() string
+       Cell() string
+       EtcdKey() string
+       LiteralCell() string
+       String() string
+       Lineage() []Path
+       // Parent returns the parent directory of this path.
+       //
+       // If this path is the root path of a cell,
+       Parent() Path
+       Parents() []Path
+       Child(string) Path
+       Basename() string
+}
+
+type ephsPath string
+
+const (
+       localCell = "local"
+       cellIndex = 2
+)
+
+var ephsDefaultCell string
+
+var ErrNotEphsPath = errors.New("not an ephs path")
+
+func DefaultCell() string {
+       return ephsDefaultCell
+}
+
+func IsEphsPath(p string) bool {
+       if !strings.HasPrefix(p, KeyPrefix) {
+               return false
+       }
+
+       // "/ephs/cell.example.com/subdir"
+       // 0  1           2          3
+       parts := strings.Split(p, PathSeparator)
+       if len(parts) < 3 {
+               return false
+       }
+
+       // all path components must be non-empty except the first and last
+       for i := 1; i < len(parts)-1; i++ {
+               if parts[i] == "" {
+                       return false
+               }
+       }
+
+       // "." and ".." are not supported as path components
+       for _, c := range parts {
+               if c == "." || c == ".." {
+                       return false
+               }
+       }
+
+       return true
+}
+
+func ParsePath(p string) (Path, error) {
+       if !IsEphsPath(p) {
+               return nil, ErrNotEphsPath
+       }
+
+       return ephsPath(p), nil
+}
+
+func (p ephsPath) AclPath() string {
+       parts := strings.Split(string(p), PathSeparator)
+       return strings.Join(parts[3:], PathSeparator)
+}
+
+func (p ephsPath) Basename() string {
+       return path.Base(string(p))
+}
+
+func (p ephsPath) Cell() string {
+       cell := p.LiteralCell()
+       if cell == localCell {
+               return ephsDefaultCell
+       }
+       return cell
+}
+
+func (p ephsPath) LiteralCell() string {
+       parts := strings.Split(string(p), PathSeparator)
+       return parts[cellIndex]
+}
+
+func (p ephsPath) EtcdKey() string {
+       parts := strings.Split(string(p), PathSeparator)
+       if parts[cellIndex] == localCell {
+               parts[cellIndex] = ephsDefaultCell
+       }
+
+       return strings.Join(parts, PathSeparator)
+}
+
+func (p ephsPath) Parent() Path {
+       parts := strings.Split(string(p), PathSeparator)
+       if len(parts) == cellIndex+1 {
+               return p
+       }
+
+       parent := strings.Join(parts[:len(parts)-1], PathSeparator)
+       return ephsPath(parent)
+}
+
+func (p ephsPath) Parents() []Path {
+       var parents []Path
+       parts := strings.Split(string(p), PathSeparator)
+       for i := 3; i < len(parts); i++ {
+               parents = append(parents, ephsPath(strings.Join(parts[:i], PathSeparator)))
+       }
+
+       return parents
+}
+
+func (p ephsPath) Lineage() []Path {
+       lineage := p.Parents()
+       lineage = append(lineage, p)
+       return lineage
+}
+
+func (p ephsPath) Child(name string) Path {
+       return ephsPath(string(p) + PathSeparator + name)
+}
+
+func (p ephsPath) String() string {
+       return string(p)
+}
+
+func init() {
+       flag.StringVar(&defaultClientAddr, "ephs.addr", "", "comma-separated list of server addresses for ephs; leave blank to use service discovery")
+       flag.StringVar(&ephsDefaultCell, "ephs.cell", hostname.DomainName(), "ephs cell")
+}
index cff5bc5228a9ac4ce3cb1d9c9b9120b58eba4b33..17213fb3f4119c8f23dba908d20a55a752d47e79 100644 (file)
@@ -4,36 +4,27 @@ go_library(
     name = "servicer",
     srcs = [
         "acl.go",
-        "fs_object.go",
-        "s3.go",
         "servicer.go",
         "writer.go",
     ],
     importpath = "go.fuhry.dev/runtime/ephs/servicer",
     visibility = ["//visibility:public"],
     deps = [
-        "//constants",
         "//ephs",
+        "//ephs/ephsll",
         "//grpc",
         "//mtls",
         "//mtls/fsnotify",
         "//proto/service/ephs",
-        "//sd",
-        "//utils/hostname",
         "//utils/log",
         "//utils/option",
         "//utils/stringmatch",
-        "@com_github_minio_minio_go_v7//:minio-go",
-        "@com_github_minio_minio_go_v7//pkg/credentials",
         "@in_gopkg_yaml_v3//:yaml_v3",
-        "@io_etcd_go_etcd_client_v3//:client",
-        "@io_etcd_go_etcd_client_v3//concurrency",
         "@org_golang_google_grpc//codes",
         "@org_golang_google_grpc//peer",
         "@org_golang_google_grpc//status",
         "@org_golang_google_protobuf//proto",
         "@org_golang_google_protobuf//types/known/emptypb",
-        "@org_golang_google_protobuf//types/known/timestamppb",
     ],
 )
 
index 982504fbefcee1458550e2ddb3dd67032f848cda..59eb0bf4af1755e9902e29ffc2f84ebe250a852b 100644 (file)
@@ -1,35 +1,29 @@
+// Package servicer implements the gRPC servicer for ephs.
+//
+// The servicer implements all access control logic.
+//
+// Interaction with etcd takes place in the `go.fuhry.dev/runtime/ephs/ephsll` package.
+
 package servicer
 
 import (
        "bytes"
        "context"
        "errors"
-       "flag"
-       "fmt"
        "io"
-       "path"
-       "slices"
-       "strings"
-       "sync"
-
-       "github.com/minio/minio-go/v7"
-       "github.com/minio/minio-go/v7/pkg/credentials"
-       etcd_client "go.etcd.io/etcd/client/v3"
-       etcd_concurrency "go.etcd.io/etcd/client/v3/concurrency"
+
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/peer"
        "google.golang.org/grpc/status"
        "google.golang.org/protobuf/proto"
        "google.golang.org/protobuf/types/known/emptypb"
-       "google.golang.org/protobuf/types/known/timestamppb"
 
        "go.fuhry.dev/runtime/ephs"
+       "go.fuhry.dev/runtime/ephs/ephsll"
        "go.fuhry.dev/runtime/grpc"
        "go.fuhry.dev/runtime/mtls"
        "go.fuhry.dev/runtime/mtls/fsnotify"
        ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
-       "go.fuhry.dev/runtime/sd"
-       "go.fuhry.dev/runtime/utils/hostname"
        "go.fuhry.dev/runtime/utils/log"
        "go.fuhry.dev/runtime/utils/option"
 )
@@ -43,12 +37,10 @@ type msgWithPath interface {
 type ephsServicer struct {
        ephs_proto.EphsServer
 
-       acl      *Acl
-       logger   log.Logger
-       clientMu sync.Mutex
-       clients  map[string]*etcd_client.Client
-       s3Creds  *credentials.Credentials
-       s3Client *minio.Client
+       acl    *Acl
+       logger log.Logger
+       ll     ephsll.EphsLowLevelClient
+       llOpts []ephsll.Option
 }
 
 var _ ephs_proto.EphsServer = &ephsServicer{}
@@ -92,185 +84,83 @@ func WithAclFile(aclFile string) Option {
        })
 }
 
+func WithAWSEnvCredentials() Option {
+       return option.NewOption(func(s *ephsServicer) error {
+               s.llOpts = append(s.llOpts, ephsll.WithAWSEnvCredentials())
+               return nil
+       })
+}
+
+func WithAWSCredentialFile(filename string) Option {
+       return option.NewOption(func(s *ephsServicer) error {
+               s.llOpts = append(s.llOpts, ephsll.WithAWSCredentialFile(filename))
+               return nil
+       })
+}
+
 func NewEphsServicer(opts ...Option) (ephs_proto.EphsServer, error) {
        serv := &ephsServicer{
-               clients: make(map[string]*etcd_client.Client),
-               logger:  log.Default().WithPrefix("ephsServicer"),
-       }
-       if _, err := serv.clientForCell(cell); err != nil {
-               return nil, err
+               logger: log.Default().WithPrefix("ephsServicer"),
        }
+
        for _, o := range opts {
                if err := o.Apply(serv); err != nil {
                        return nil, err
                }
        }
 
-       s3, err := serv.newS3Client()
+       ll, err := ephsll.NewEphsLowLevelClient(mtls.DefaultIdentity(), serv.llOpts...)
        if err != nil {
                return nil, err
        }
-       serv.s3Client = s3
+       serv.ll = ll
 
        return serv, nil
 }
 
-func (s *ephsServicer) clientForCell(cell string) (*etcd_client.Client, error) {
-       s.clientMu.Lock()
-       defer s.clientMu.Unlock()
-
-       if _, ok := s.clients[cell]; !ok {
-               client, err := sd.NewEtcdClient(
-                       mtls.DefaultIdentity(),
-                       cell,
-               )
-               if err != nil {
-                       return nil, err
-               }
-               s.clients[cell] = client
-       }
-
-       return s.clients[cell], nil
-}
-
-func (s *ephsServicer) checkPath(ctx context.Context, path string) (string, error) {
-       if !ephs.IsEphsPath(path) {
-               return "", status.Errorf(codes.InvalidArgument, "path %q must start with "+ephs.KeyPrefix, path)
-       }
-
-       parts := strings.Split(path, "/")
-
-       if len(parts) < 3 {
-               return "", status.Errorf(
+func (s *ephsServicer) checkPath(ctx context.Context, p string) (ephs.Path, error) {
+       ephsPath, err := ephs.ParsePath(p)
+       if err != nil {
+               return nil, status.Errorf(
                        codes.InvalidArgument,
-                       "path must be at least 3 levels deep: %q",
-                       path)
-       }
-
-       for i, part := range parts {
-               if (i > 0 && part == "") || part == "." || part == ".." {
-                       return "", status.Errorf(codes.InvalidArgument,
-                               "path %q is invalid: may not contain empty elements, '.' or '..'", path)
-               }
+                       "failed to parse ephs path %q: %v",
+                       p, err)
        }
 
        ident, err := identityFromContext(ctx)
        if err != nil {
-               return "", err
-       }
-
-       aclPath := strings.Join(parts[3:], "/")
-
-       if parts[2] == "local" {
-               parts[2] = cell
+               return nil, err
        }
 
        if s.acl != nil {
-               if !s.acl.Check(ident, aclPath) {
-                       return "", status.Errorf(
+               if !s.acl.Check(ident, ephsPath.AclPath()) {
+                       return nil, status.Errorf(
                                codes.PermissionDenied,
                                "access to path %q denied for %q (rule path: %q)",
-                               strings.Join(parts, "/"),
+                               ephsPath.String(),
                                ident,
-                               aclPath)
+                               ephsPath.AclPath())
                }
        }
 
-       return strings.Join(parts, "/"), nil
+       return ephsPath, nil
 }
 
 func (s *ephsServicer) Stat(ctx context.Context, req *ephs_proto.GetRequest) (*ephs_proto.StatResponse, error) {
        s.logRequest(ctx, "Stat", req)
-       entry, err := s.getPath(ctx, req.Path)
+       ephsPath, err := s.checkPath(ctx, req.Path)
        if err != nil {
                return nil, err
        }
 
-       return &ephs_proto.StatResponse{
-               Entry: entry,
-       }, nil
-}
-
-func (s *ephsServicer) getPath(ctx context.Context, path string) (*ephs_proto.FsEntry, error) {
-       var err error
-       if path, err = s.checkPath(ctx, path); err != nil {
-               return nil, err
-       }
-
-       cell := strings.Split(path, "/")[2]
-       etcd, err := s.clientForCell(cell)
-       if err != nil {
-               return nil, err
-       }
-
-       obj, err := etcd.Get(ctx, path)
+       entry, err := s.ll.Stat(ctx, ephsPath)
        if err != nil {
                return nil, err
        }
 
-       if len(obj.Kvs) < 1 {
-               return nil, status.Error(
-                       codes.NotFound,
-                       path)
-       }
-
-       entry := &ephs_proto.FsEntry{}
-       if err := proto.Unmarshal(obj.Kvs[0].Value, entry); err != nil {
-               return nil, status.Errorf(
-                       codes.DataLoss,
-                       "failed to unmarshal key %q as %s: %v",
-                       obj.Kvs[0].Key,
-                       proto.MessageName(entry),
-                       err)
-       }
-
-       return entry, nil
-}
-
-func (s *ephsServicer) modifyDirectory(ctx context.Context, path string, apply func(*ephs_proto.FsEntry_Directory, etcd_concurrency.STM) error) error {
-       var err error
-       if path, err = s.checkPath(ctx, path); err != nil {
-               return err
-       }
-
-       cell := strings.Split(path, "/")[2]
-       etcd, err := s.clientForCell(cell)
-       if err != nil {
-               return err
-       }
-
-       txnResp, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error {
-               dirRaw := []byte(stm.Get(path))
-               dir := newDir(ctx)
-               if len(dirRaw) > 0 {
-                       if err := proto.Unmarshal(dirRaw, dir); err != nil {
-                               return fmt.Errorf("error unmarshaling FsEntry at %q: %v", path, err)
-                       }
-               }
-
-               if dir.Content.GetDirectory() == nil {
-                       return fmt.Errorf("ephs path %q: not a directory", path)
-               }
-
-               if err = apply(dir.Content.GetDirectory(), stm); err != nil {
-                       return fmt.Errorf("error performing atomic directory modification: %v", err)
-               }
-
-               dir.Modified = timestamppb.Now()
-               dir.Version += 1
-
-               if dirRaw, err = proto.Marshal(dir); err != nil {
-                       return fmt.Errorf("error re-marshaling FsEntry at %q after calling apply: %v", path, err)
-               }
-
-               stm.Put(path, string(dirRaw))
-
-               return nil
-       })
-
-       s.logger.Debugf("transaction status modifying directory %q: %+v", path, txnResp)
-
-       return err
+       return &ephs_proto.StatResponse{
+               Entry: entry,
+       }, nil
 }
 
 func (s *ephsServicer) logRequest(ctx context.Context, method string, req msgWithPath) {
@@ -286,32 +176,31 @@ func (s *ephsServicer) Get(req *ephs_proto.GetRequest, server ephs_proto.Ephs_Ge
        ctx := server.Context()
        s.logRequest(ctx, "Get", req)
 
-       entry, err := s.getPath(ctx, req.Path)
+       ephsPath, err := s.checkPath(ctx, req.Path)
        if err != nil {
                return err
        }
 
-       if entry.Content.GetDirectory() != nil {
-               return status.Errorf(
-                       codes.FailedPrecondition,
-                       "%q: is a directory",
-                       req.Path)
+       entry, reader, err := s.ll.Get(ctx, ephsPath)
+       if err != nil {
+               if errors.Is(err, ephsll.ErrDirectory) {
+                       return status.Errorf(
+                               codes.FailedPrecondition,
+                               "%q: is a directory",
+                               req.Path)
+               }
+
+               return err
        }
 
-       entryCopy := *entry
+       entryCopy := proto.Clone(entry).(*ephs_proto.FsEntry)
 
        response := &ephs_proto.GetResponse{
-               Entry: &entryCopy,
+               Entry: entryCopy,
        }
 
        response.Entry.Content = nil
 
-       ent := &FsEntry{entry}
-       reader, err := ent.GetContents(ctx, s.s3Client)
-       if err != nil {
-               return err
-       }
-
        writer := &responseWriter{server: server, response: response}
        n, err := io.Copy(writer, reader)
        if err != nil {
@@ -323,154 +212,25 @@ func (s *ephsServicer) Get(req *ephs_proto.GetRequest, server ephs_proto.Ephs_Ge
        return nil
 }
 
-func (s *ephsServicer) mkdir(ctx context.Context, pathName string, recursive bool) error {
-       parts := strings.Split(pathName, "/")
-
-       cell := parts[2]
-       etcd, err := s.clientForCell(cell)
-       if err != nil {
-               return err
-       }
-
-       txn, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error {
-               var (
-                       dir *ephs_proto.FsEntry = newDir(ctx)
-                       err error
-               )
-               for i := 3; i < len(parts); i++ {
-                       parent := strings.Join(parts[:i], "/")
-
-                       dirRaw := []byte(stm.Get(parent))
-                       if len(dirRaw) == 0 {
-                               if !recursive {
-                                       return status.Errorf(codes.NotFound, "cannot mkdir %q: parent directory %q does not exist and recursion is disabled", pathName, parent)
-                               }
-
-                               dir = newDir(ctx)
-                       } else {
-                               err = proto.Unmarshal(dirRaw, dir)
-                               if err != nil {
-                                       return status.Errorf(codes.Internal,
-                                               "error unmarshaling parent directory %q: %T: %v",
-                                               parent, err, err)
-                               }
-
-                               if dir.Content.GetDirectory() == nil {
-                                       return status.Errorf(
-                                               codes.FailedPrecondition,
-                                               "cannot mkdir %q: parent item %q is not a directory (raw len: %d)",
-                                               pathName, parent, len(dirRaw))
-                               }
-
-                               dir.Modified = timestamppb.Now()
-                               dir.Version += 1
-                       }
-
-                       dEnt := dir.Content.GetDirectory()
-
-                       dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{
-                               Name:      parts[i],
-                               Directory: true,
-                       })
-
-                       dirRaw, err = proto.Marshal(dir)
-                       if err != nil {
-                               return status.Errorf(codes.Internal,
-                                       "error marshaling parent directory %q: %T: %v",
-                                       parent, err, err)
-                       }
-
-                       stm.Put(parent, string(dirRaw))
-               }
-
-               entry := []byte(stm.Get(pathName))
-               if len(entry) > 0 {
-                       return status.Errorf(codes.AlreadyExists,
-                               "cannot create directory %q: already exists",
-                               pathName)
-               }
-
-               dirRaw, err := proto.Marshal(newDir(ctx))
-               if err != nil {
-                       return status.Errorf(codes.Internal,
-                               "error marshaling new directory %q: %T: %v",
-                               pathName, err, err)
-               }
-
-               stm.Put(pathName, string(dirRaw))
-               return nil
-       })
-       if err != nil {
-               return err
-       }
-       if !txn.Succeeded {
-               return fmt.Errorf("transaction failed: %+v", txn)
-       }
-
-       return nil
-}
-
 func (s *ephsServicer) MkDir(ctx context.Context, req *ephs_proto.MkDirRequest) (*emptypb.Empty, error) {
        path, err := s.checkPath(ctx, req.Path)
        if err != nil {
                return nil, err
        }
 
-       return &emptypb.Empty{}, s.mkdir(ctx, path, req.Recursive)
+       whoami, _ := identityFromContext(ctx)
+
+       return &emptypb.Empty{}, s.ll.Mkdir(ctx, path, whoami, req.Recursive)
 }
 
 func (s *ephsServicer) Delete(ctx context.Context, req *ephs_proto.DeleteRequest) (*emptypb.Empty, error) {
        s.logRequest(ctx, "Delete", req)
-       targetPath, err := s.checkPath(ctx, req.Path)
-       if err != nil {
-               return nil, err
-       }
-
-       entry, err := s.getPath(ctx, targetPath)
+       ephsPath, err := s.checkPath(ctx, req.Path)
        if err != nil {
                return nil, err
        }
 
-       fse := &FsEntry{entry}
-       if fse.Content.GetDirectory() != nil {
-               if len(fse.Content.GetDirectory().Entries) > 0 && !req.Recursive {
-                       return nil, status.Errorf(codes.FailedPrecondition,
-                               "refusing to delete non-empty directory %q without recursion flag set",
-                               targetPath)
-               }
-
-               for _, ent := range fse.Content.GetDirectory().Entries {
-                       subReq := &ephs_proto.DeleteRequest{
-                               Path:      path.Join(targetPath, ent.Name),
-                               Recursive: true,
-                       }
-                       _, err = s.Delete(ctx, subReq)
-                       if err != nil {
-                               return nil, err
-                       }
-               }
-       } else {
-               if err := fse.DeleteContents(ctx, s.s3Client); err != nil {
-                       return nil, err
-               }
-       }
-
-       dir := path.Dir(targetPath)
-       err = s.modifyDirectory(ctx, dir, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error {
-               index := -1
-               for i, ent := range dEnt.Entries {
-                       if ent.Name == path.Base(targetPath) {
-                               index = i
-                               break
-                       }
-               }
-               if index >= 0 {
-                       dEnt.Entries = slices.Delete(dEnt.Entries, index, index+1)
-               }
-               stm.Del(targetPath)
-               return nil
-       })
-       if err != nil {
+       if err := s.ll.Delete(ctx, ephsPath, req.Recursive); err != nil {
                return nil, err
        }
 
@@ -491,170 +251,47 @@ func (s *ephsServicer) Put(server ephs_proto.Ephs_PutServer) error {
                return err
        }
 
-       targetPath, err := s.checkPath(ctx, msg.Path)
-       if err != nil {
-               return err
-       }
-
-       dir := path.Dir(targetPath)
-       _, err = s.getPath(ctx, dir)
+       ephsPath, err := s.checkPath(ctx, msg.Path)
        if err != nil {
-               if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
-                       return status.Errorf(
-                               codes.FailedPrecondition,
-                               "cannot write %q: parent directory %q does not exist",
-                               targetPath, dir)
-               }
                return err
        }
 
-       pbEnt, err := s.getPath(ctx, targetPath)
-       if err != nil {
-               if status, ok := status.FromError(err); ok && status.Code() == codes.NotFound {
-                       // NotFound is only returned after all access checks have succeeded. Ok to
-                       // create file.
-                       now := timestamppb.Now()
-                       pbEnt = &ephs_proto.FsEntry{
-                               Created:  now,
-                               Modified: now,
-                               Version:  msg.Version - 1,
-                               Size:     msg.Size,
-                               Owner:    peerId,
-                       }
-               } else {
-                       // some other error besides not-found
-                       return err
-               }
-       } else {
-               if msg.Version != 0 && msg.Version != (pbEnt.Version+1) {
-                       return status.Errorf(
-                               codes.FailedPrecondition,
-                               "version conflict: attempted to overwrite version %d with %d, "+
-                                       "version number must be current version + 1",
-                               pbEnt.Version,
-                               msg.Version)
-               }
-       }
-       ent := &FsEntry{pbEnt}
-       if pbEnt.Content.GetDirectory() != nil {
-               return status.Errorf(codes.FailedPrecondition,
-                       "cannot write %q: path already exists and is a directory",
-                       targetPath)
-       }
-
        reader := &putReader{
                initialMsg: msg,
                server:     server,
                buf:        &bytes.Buffer{},
        }
 
-       if err := ent.SetContents(ctx, s.s3Client, reader, msg.Size); err != nil {
-               return err
-       }
-
-       ent.FsEntry.Modified = timestamppb.Now()
-       ent.FsEntry.Size = msg.Size
-       ent.FsEntry.Version++
-
-       marshaled, err := proto.Marshal(ent.FsEntry)
-       if err != nil {
-               return err
-       }
-
-       err = s.modifyDirectory(ctx, dir, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error {
-               stm.Put(targetPath, string(marshaled))
-               base := path.Base(targetPath)
-               for _, ent := range dEnt.Entries {
-                       if ent.Name == base {
-                               return nil
-                       }
-               }
-
-               dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{
-                       Name:      base,
-                       Directory: false,
-               })
-
-               return nil
-       })
-
-       if err != nil {
-               _ = ent.DeleteContents(ctx, s.s3Client)
-               return err
-       }
+       ent, err := s.ll.Put(ctx, ephsPath, peerId, msg.Version, msg.Size, reader)
 
        response := &ephs_proto.StatResponse{
-               Entry: ent.FsEntry,
+               Entry: ent,
        }
        return server.SendAndClose(response)
 }
 
 func (s *ephsServicer) Watch(req *ephs_proto.GetRequest, server ephs_proto.Ephs_WatchServer) error {
-       ctx := etcd_client.WithRequireLeader(server.Context())
+       ctx := server.Context()
        s.logRequest(ctx, "Watch", req)
-       path, err := s.checkPath(ctx, req.Path)
-       if err != nil {
-               return err
-       }
-
-       response := &ephs_proto.WatchResponse{
-               Entry: nil,
-               Event: ephs_proto.WatchResponse_DELETE,
-       }
-
-       if resp, err := s.getPath(ctx, path); err == nil {
-               response.Entry = resp
-               response.Event = ephs_proto.WatchResponse_CREATE
-       }
-
-       cell := strings.Split(path, "/")[2]
-       etcd, err := s.clientForCell(cell)
+       ephsPath, err := s.checkPath(ctx, req.Path)
        if err != nil {
                return err
        }
 
-       if err := server.Send(response); err != nil {
-               return err
-       }
-
        for ctx.Err() == nil {
-               watcher := etcd.Watch(ctx, path)
-               for msg := range watcher {
-                       s.logger.Debugf("watcher: got event: %+v", msg)
-                       for _, event := range msg.Events {
-                               if string(event.Kv.Key) != path {
-                                       continue
-                               }
-
-                               response.Entry = &ephs_proto.FsEntry{}
-
-                               if event.IsCreate() {
-                                       response.Event = ephs_proto.WatchResponse_CREATE
-                               } else if event.IsModify() {
-                                       response.Event = ephs_proto.WatchResponse_MODIFY
-                               } else if event.Type == etcd_client.EventTypeDelete {
-                                       response.Entry = nil
-                                       response.Event = ephs_proto.WatchResponse_DELETE
-                               } else {
-                                       continue
-                               }
-
-                               if response.Entry != nil {
-                                       if err = proto.Unmarshal(event.Kv.Value, response.Entry); err != nil {
-                                               s.logger.Errorf("protobuf marshal error: %v", err)
-                                               return err
-                                       }
-                               }
-
-                               if err := server.Send(response); err != nil {
-                                       s.logger.Errorf("stream send error: %v", err)
-                                       return err
-                               }
+               ch, err := s.ll.Watch(ctx, ephsPath)
+               if err != nil {
+                       return err
+               }
+
+               for msg := range ch {
+                       if err := server.Send(msg); err != nil {
+                               return err
                        }
                }
        }
 
-       s.logger.Noticef("Watch(%q) ending: ctx err: %v", path, ctx.Err())
+       s.logger.V(1).Debugf("Watch(%q) ending: ctx err: %v", ephsPath.String(), ctx.Err())
 
        if errors.Is(ctx.Err(), context.Canceled) {
                return status.Error(codes.Unavailable, "server is shutting down, please reconnect")
@@ -676,23 +313,3 @@ func identityFromContext(ctx context.Context) (string, error) {
 
        return ident.String(), nil
 }
-
-func newDir(ctx context.Context) *ephs_proto.FsEntry {
-       whoami, _ := identityFromContext(ctx)
-
-       return &ephs_proto.FsEntry{
-               Content: &ephs_proto.FsEntry_Content{
-                       Content: &ephs_proto.FsEntry_Content_Directory{
-                               Directory: &ephs_proto.FsEntry_Directory{},
-                       },
-               },
-               Owner:    whoami,
-               Created:  timestamppb.Now(),
-               Modified: timestamppb.Now(),
-               Version:  1,
-       }
-}
-
-func init() {
-       flag.StringVar(&cell, "ephs.cell", hostname.DomainName(), "ephs cell")
-}
index 6cccc6ca059c621b7e7a96154c2008281aad059a..4d3d6d7a1985b6686217e519357afcdff3388429 100644 (file)
@@ -4,6 +4,7 @@ import (
        "bytes"
        "fmt"
 
+       "go.fuhry.dev/runtime/ephs/ephsll"
        ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
        "go.fuhry.dev/runtime/utils/log"
 )
@@ -15,7 +16,7 @@ type responseWriter struct {
 
 func (w *responseWriter) Write(p []byte) (int, error) {
        log.Default().V(3).Debugf("responseWriter: writing %d bytes", len(p))
-       chunkLen := min(len(p), LargeFileThreshold)
+       chunkLen := min(len(p), ephsll.LargeFileThreshold)
 
        w.response.Chunk = make([]byte, chunkLen)
        copy(w.response.Chunk, p)
index 767ab9242757166d4d4c15db7c87c3c7bc3fdf9c..40a50e867e0806d6fa82ddcc2409b35b811f2ad3 100644 (file)
@@ -21,6 +21,7 @@ go_library(
         "//net/dns",
         "//sd",
         "//utils/context",
+        "//utils/log",
         "//utils/option",
         "@org_golang_google_grpc//:grpc",
     ],
index 5551948b84e39a14fb093384c6e329c00c3ddd71..990e7c7a442f6065b5c031db0522041a813aa7c9 100644 (file)
@@ -4,10 +4,13 @@ import (
        "context"
        "fmt"
        "net"
+       "strconv"
 
        "go.fuhry.dev/runtime/grpc/internal/common"
        "go.fuhry.dev/runtime/mtls"
+       "go.fuhry.dev/runtime/net/dns"
        "go.fuhry.dev/runtime/sd"
+       "go.fuhry.dev/runtime/utils/log"
        "go.fuhry.dev/runtime/utils/option"
        "google.golang.org/grpc"
 )
@@ -27,6 +30,7 @@ type AddressProvider interface {
 
 type client struct {
        ctx      context.Context
+       logger   log.Logger
        serverId mtls.Identity
        clientId mtls.Identity
        watcher  AddressProvider
@@ -55,24 +59,48 @@ func WithAddressProvider(ap AddressProvider) ClientOption {
        })
 }
 
-func WithStaticAddress(addresses ...*net.TCPAddr) ClientOption {
-       var addrs []sd.ServiceAddress
-
-       for _, addr := range addresses {
-               var ip4, ip6 string
-               if len(addr.IP) == 4 {
-                       ip4 = addr.IP.String()
-               } else {
-                       ip6 = addr.IP.String()
-               }
-               addrs = append(addrs, sd.ServiceAddress{
-                       IP4:  ip4,
-                       IP6:  ip6,
-                       Port: uint16(addr.Port),
-               })
-       }
-
+func WithStaticAddress(addresses ...string) ClientOption {
        return option.NewOption(func(c *client) error {
+               var addrs []sd.ServiceAddress
+
+               for i, addr := range addresses {
+                       host, port, err := net.SplitHostPort(addr)
+                       if err != nil {
+                               return fmt.Errorf("failed to parse host:port %q at index %d: %v", addr, i, err)
+                       }
+
+                       portNumber, err := net.LookupPort("tcp", port)
+                       if err != nil {
+                               return fmt.Errorf("failed to lookup named port %q at index %d: %v", port, i, err)
+                       }
+
+                       var ip4, ip6 string
+                       if ip := net.ParseIP(host); ip != nil {
+                               if len(ip) == 4 {
+                                       ip4 = ip.String()
+                               } else {
+                                       ip6 = ip.String()
+                               }
+                       } else {
+                               ip4, ip6, err = dns.ResolveDualStack(host)
+                               if err != nil {
+                                       return fmt.Errorf("failed to lookup host %q at index %d: %v", host, i, err)
+                               }
+                       }
+
+                       if ip4 == "" && ip6 == "" {
+                               return fmt.Errorf(
+                                       "failed to parse address %q at index %d: IPv4 and IPv6 addresses both empty",
+                                       addr, i)
+                       }
+
+                       addrs = append(addrs, sd.ServiceAddress{
+                               Hostname: host,
+                               IP4:      ip4,
+                               IP6:      ip6,
+                               Port:     uint16(portNumber),
+                       })
+               }
                c.watcher = &staticAddressProvider{
                        addresses: addrs,
                }
@@ -94,6 +122,7 @@ func WithDNSSRV() ClientOption {
 func NewGrpcClient(ctx context.Context, serverId string, clientId mtls.Identity, opts ...ClientOption) (Client, error) {
        cl := &client{
                ctx:      ctx,
+               logger:   log.Default().WithPrefix(fmt.Sprintf("grpc.Client[%s]", serverId)),
                serverId: mtls.NewRemoteServiceIdentity(serverId),
                clientId: clientId,
                connFac:  common.NewDefaultConnectionFactory(),
@@ -145,7 +174,8 @@ func (c *client) Conn() (*grpc.ClientConn, error) {
                dialer,
        }
 
-       target := fmt.Sprintf("%s:%d", addrs[0].Hostname, addrs[0].Port)
+       target := net.JoinHostPort(addrs[0].Hostname, strconv.Itoa(int(addrs[0].Port)))
+       c.logger.V(1).Infof("establishing conn to %s", target)
        conn, err := grpc.DialContext(c.ctx, target, opts...)
        if err != nil {
                return nil, err