Make the ephs servicer act as a front-end to the ephs low-level client library, which handles all direct interaction with etcd.
- Move all ephs etcd interaction from `ephs/servicer` to `ephs/ephsll`
- Add ephs.Path type to make ephs path transformations simpler
- Refactor grpc's `WithStaticAddress` option to support DNS names
},
}
- if err := cmd.Run(ctx, os.Args); err != nil {
+ args := os.Args[:1]
+ args = append(args, flag.Args()...)
+
+ if err := cmd.Run(ctx, args); err != nil {
log.Fatal(err)
os.Exit(1)
}
import (
"flag"
+ "os"
"go.fuhry.dev/runtime/ephs/servicer"
"go.fuhry.dev/runtime/grpc"
func main() {
var err error
+ mtls.SetDefaultIdentity("ephs")
+
acl := flag.String("ephs.acl", "", "YAML file containing ACLs for ephs")
awsFile := flag.String("ephs.s3-creds-file", "", "file to load AWS S3 credentials from")
awsEnv := flag.Bool("ephs.s3-creds-env", false, "set to true to load AWS credentials from environment")
serverIdentity := mtls.DefaultIdentity()
s, err := grpc.NewGrpcServer(serverIdentity, grpc.WithTransport(&grpc.QUICConnectionFactory{}))
if err != nil {
- panic(err)
+ log.Fatal(err)
+ os.Exit(1)
}
var opts []servicer.Option
serv, err := servicer.NewEphsServicer(opts...)
if err != nil {
log.Fatal(err)
+ os.Exit(1)
}
ctx, cancel := context.Interruptible()
ephs_pb.RegisterEphsServer(s, serv)
})
if err != nil {
- panic(err)
+ log.Fatal(err)
+ os.Exit(1)
}
defer s.Stop()
import (
"flag"
- "net"
"os"
- "strconv"
"time"
"go.fuhry.dev/runtime/grpc"
var opts []grpc.ClientOption
if *serverAddr != "" {
- host, port, err := net.SplitHostPort(*serverAddr)
- if err != nil {
- log.Default().Panic(err)
- }
-
- portInt, err := strconv.Atoi(port)
- if err != nil {
- portInt, err = net.LookupPort("tcp", port)
- if err != nil {
- log.Default().Panic(err)
- }
- }
-
- if ip := net.ParseIP(host); ip == nil {
- log.Default().Panicf("%q: not a valid IPv4 or IPv6 address", host)
- } else {
- opts = append(opts, grpc.WithStaticAddress(&net.TCPAddr{ip, portInt, ""}))
- }
+ opts = append(opts, grpc.WithStaticAddress(*serverAddr))
}
ctx, cancel := context.Interruptible()
go_library(
name = "ephs",
- srcs = ["client.go"],
+ srcs = [
+ "client.go",
+ "path.go",
+ ],
importpath = "go.fuhry.dev/runtime/ephs",
visibility = ["//visibility:public"],
deps = [
"//mtls",
"//proto/service/ephs",
"//utils/context",
+ "//utils/hostname",
"//utils/log",
"//utils/option",
"@com_github_quic_go_quic_go//:quic-go",
+// Package ephs is the public-facing, high-level client library for interacting with ephs.
+//
+// ephs (pronounced as "effs") is "ephemeral filesystem." It is a filesystem implemented on top of
+// etcd and S3-compatible object storage, with object storage being used for files above a certain
+// size threshold. Presently this is 256 KiB or one chunk.
+//
+// Secure deployment of ephs requires either a dedicated etcd instance or careful configuration of
+// etcd access such that reads and writes of keys beginning with `/ephs/` are restricted to the ephs
+// gRPC service.
+
package ephs
import (
)
const KeyPrefix = "/ephs/"
+const PathSeparator = "/"
const ChunkSize = 262144
const formatEntryDateFormat = "Monday, 2 Jan 2006 15:04:05 -0700"
defaultTimeout time.Duration
id mtls.Identity
- client grpc.Client
- conn *grpc.ClientConn
+ grpcOpts []grpc.ClientOption
+ client grpc.Client
+ conn *grpc.ClientConn
}
type getReader struct {
KeepAlivePeriod: 5 * time.Second,
}
+var defaultClientAddr string
+
var defaultClient Client
var defaultClientMu sync.Mutex
-func IsEphsPath(p string) bool {
- return strings.HasPrefix(p, KeyPrefix)
-}
-
type notFoundError struct {
ephsPath string
}
})
}
+func withGrpcClientOptions(opts ...grpc.ClientOption) ClientOption {
+ return option.NewOption(func(c *clientImpl) error {
+ c.grpcOpts = append(c.grpcOpts, opts...)
+ return nil
+ })
+}
+
func DefaultClient() (Client, error) {
defaultClientMu.Lock()
defer defaultClientMu.Unlock()
DefaultClientContext, _ = context.Interruptible()
}
- client, err := NewClient(DefaultClientContext, mtls.DefaultIdentity())
+ var opts []ClientOption
+ if defaultClientAddr != "" {
+ addrs := strings.Split(defaultClientAddr, ",")
+ opts = append(opts, withGrpcClientOptions(grpc.WithStaticAddress(addrs...)))
+ }
+
+ client, err := NewClient(DefaultClientContext, mtls.DefaultIdentity(), opts...)
if err != nil {
return nil, err
}
defaultCtx: ctx,
defaultTimeout: 15 * time.Second,
id: localId,
+ grpcOpts: []grpc.ClientOption{
+ grpc.WithConnectionFactory(&grpc_common.QUICConnectionFactory{
+ QUICConfig: ephsQuicConfig.Clone(),
+ }),
+ },
}
for _, opt := range opts {
var err error
if c.client == nil {
- c.client, err = grpc.NewGrpcClient(c.defaultCtx, "ephs", c.id,
- grpc.WithConnectionFactory(&grpc_common.QUICConnectionFactory{
- QUICConfig: ephsQuicConfig.Clone(),
- }))
+ c.client, err = grpc.NewGrpcClient(c.defaultCtx, "ephs", c.id, c.grpcOpts...)
if err != nil {
return nil, fmt.Errorf("error creating grpc client: %T: %v", err, err)
}
--- /dev/null
+load("@rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "ephsll",
+ srcs = [
+ "fs_object.go",
+ "low_level_client.go",
+ "s3.go",
+ ],
+ importpath = "go.fuhry.dev/runtime/ephs/ephsll",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//constants",
+ "//ephs",
+ "//mtls",
+ "//proto/service/ephs",
+ "//sd",
+ "//utils/log",
+ "//utils/option",
+ "@com_github_minio_minio_go_v7//:minio-go",
+ "@com_github_minio_minio_go_v7//pkg/credentials",
+ "@io_etcd_go_etcd_client_v3//:client",
+ "@io_etcd_go_etcd_client_v3//concurrency",
+ "@org_golang_google_grpc//codes",
+ "@org_golang_google_grpc//status",
+ "@org_golang_google_protobuf//proto",
+ "@org_golang_google_protobuf//types/known/timestamppb",
+ ],
+)
-package servicer
+package ephsll
import (
"bytes"
const LargeFileThreshold = ephs.ChunkSize
-type FsEntry struct {
+type fsEntryWrapper struct {
*ephs_proto.FsEntry
+ s3Client *minio.Client
}
var ErrDirectory = errors.New("is a directory")
var RegexpLargeFileKey = regexp.MustCompile(`^[0-9A-Za-z_-]{64}$`)
-func (e *FsEntry) GetContents(ctx context.Context, s3 *minio.Client) (io.Reader, error) {
+func (e *fsEntryWrapper) Proto() *ephs_proto.FsEntry {
+ return e.FsEntry
+}
+
+func (e *fsEntryWrapper) GetContents(ctx context.Context) (io.Reader, error) {
switch {
case e.FsEntry.Content.GetDirectory() != nil:
return nil, ErrDirectory
case e.FsEntry.Content.GetFile() != nil:
return e.readFile()
case e.FsEntry.Content.GetLargeFile() != nil:
- return e.readLargeFile(ctx, s3)
+ return e.readLargeFile(ctx)
default:
return nil, errors.New("file content not populated with any known type")
}
}
-func (e *FsEntry) SetContents(ctx context.Context, s3 *minio.Client, r io.Reader, size uint64) error {
+func (e *fsEntryWrapper) SetContents(ctx context.Context, r io.Reader, size uint64) error {
if e.FsEntry.Content.GetDirectory() != nil {
return ErrDirectory
}
var err error
if size >= LargeFileThreshold {
- err = e.writeLargeFile(ctx, s3, r, size)
+ err = e.writeLargeFile(ctx, r, size)
} else {
err = e.writeFile(r, size)
}
if mustCleanupLargeFile {
log.Default().Infof("mustCleanupLargeFile: %+v", oldLargeFile)
- err = e.cleanupOldLargeFile(ctx, s3, &oldLargeFile)
+ err = e.cleanupOldLargeFile(ctx, &oldLargeFile)
if err != nil {
log.Default().Alert(
"error while cleaning up old large file with key %q: %v",
return nil
}
-func (e *FsEntry) DeleteContents(ctx context.Context, s3 *minio.Client) error {
+func (e *fsEntryWrapper) DeleteContents(ctx context.Context) error {
switch {
case e.FsEntry.Content.GetDirectory() != nil:
return ErrDirectory
e.FsEntry.Content = nil
return nil
case e.FsEntry.Content.GetLargeFile() != nil:
- return e.cleanupOldLargeFile(ctx, s3, e.Content.GetLargeFile())
+ return e.cleanupOldLargeFile(ctx, e.Content.GetLargeFile())
default:
return errors.New("file content not populated with any known type")
}
}
-func (e *FsEntry) readFile() (io.Reader, error) {
+func (e *fsEntryWrapper) readFile() (io.Reader, error) {
file := e.FsEntry.Content.GetFile()
if file == nil {
return nil, errors.New("internal error: readFile() called but file is unpopulated")
}
}
-func (e *FsEntry) readLargeFile(ctx context.Context, s3 *minio.Client) (io.Reader, error) {
+func (e *fsEntryWrapper) readLargeFile(ctx context.Context) (io.Reader, error) {
+ if e.s3Client == nil {
+ return nil, errors.New("s3 client not initialized, unable to read large file")
+ }
+
if e.Content == nil || e.Content.GetLargeFile() == nil {
return nil, errors.New("not a large file")
}
return nil, errors.New("large file key contains invalid characters or is the wrong length")
}
- return s3.GetObject(ctx, s3Bucket, s3Prefix+key, minio.GetObjectOptions{})
+ return e.s3Client.GetObject(ctx, s3Bucket, s3Prefix+key, minio.GetObjectOptions{})
}
-func (e *FsEntry) writeFile(r io.Reader, size uint64) error {
+func (e *fsEntryWrapper) writeFile(r io.Reader, size uint64) error {
var compression = ephs_proto.FsEntry_File_GZIP
if f := e.FsEntry.Content.GetFile(); f != nil {
return nil
}
-func (e *FsEntry) writeLargeFile(ctx context.Context, s3 *minio.Client, r io.Reader, size uint64) error {
+func (e *fsEntryWrapper) writeLargeFile(ctx context.Context, r io.Reader, size uint64) error {
+ if e.s3Client == nil {
+ return errors.New("s3 client not initialized")
+ }
+
key := newLargeFileKey()
log.Default().Noticef("attempting to PutObject into bucket %s at key %s", s3Bucket, key)
- upload, err := s3.PutObject(ctx, s3Bucket, s3Prefix+key, r, int64(size), minio.PutObjectOptions{})
+ upload, err := e.s3Client.PutObject(ctx, s3Bucket, s3Prefix+key, r, int64(size), minio.PutObjectOptions{})
if err != nil {
log.Default().Error(err)
return err
return nil
}
-func (e *FsEntry) cleanupOldLargeFile(ctx context.Context, s3 *minio.Client, lf *ephs_proto.FsEntry_LargeFile) error {
+func (e *fsEntryWrapper) cleanupOldLargeFile(ctx context.Context, lf *ephs_proto.FsEntry_LargeFile) error {
+ if e.s3Client == nil {
+ return errors.New("s3 client not initialized")
+ }
key := lf.GetKey()
if !RegexpLargeFileKey.MatchString(key) {
return errors.New("large file key contains invalid characters or is the wrong length")
}
- return s3.RemoveObject(ctx, s3Bucket, s3Prefix+key, minio.RemoveObjectOptions{})
+ return e.s3Client.RemoveObject(ctx, s3Bucket, s3Prefix+key, minio.RemoveObjectOptions{})
}
func newLargeFileKey() string {
--- /dev/null
+// Package ephsll is the low-level client for ephs.
+//
+// The low-level client handles all direct interaction with etcd.
+
+package ephsll
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "slices"
+ "sync"
+
+ "github.com/minio/minio-go/v7"
+ "github.com/minio/minio-go/v7/pkg/credentials"
+ etcd_client "go.etcd.io/etcd/client/v3"
+ etcd_concurrency "go.etcd.io/etcd/client/v3/concurrency"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ "go.fuhry.dev/runtime/ephs"
+ "go.fuhry.dev/runtime/mtls"
+ ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
+ "go.fuhry.dev/runtime/sd"
+ "go.fuhry.dev/runtime/utils/log"
+ "go.fuhry.dev/runtime/utils/option"
+)
+
+type Option = option.Option[*ephsLowLevelClientImpl]
+
+type EphsLowLevelClient interface {
+ Stat(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error)
+ Get(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, io.Reader, error)
+ Mkdir(ctx context.Context, path ephs.Path, owner string, recursive bool) error
+ Delete(ctx context.Context, path ephs.Path, recursive bool) error
+ Put(ctx context.Context, path ephs.Path, owner string, version, size uint64, contents io.Reader) (*ephs_proto.FsEntry, error)
+ Watch(ctx context.Context, path ephs.Path) (<-chan *ephs_proto.WatchResponse, error)
+}
+
+type ephsLowLevelClientImpl struct {
+ id mtls.Identity
+ logger log.Logger
+ clientMu sync.Mutex
+ clients map[string]*etcd_client.Client
+ s3Creds *credentials.Credentials
+ s3Client *minio.Client
+}
+
+var _ EphsLowLevelClient = &ephsLowLevelClientImpl{}
+
+func NewEphsLowLevelClient(id mtls.Identity, opts ...Option) (EphsLowLevelClient, error) {
+ ll := &ephsLowLevelClientImpl{
+ id: id,
+ logger: log.WithPrefix("EphsLowLevelClient"),
+ clients: make(map[string]*etcd_client.Client),
+ }
+
+ for _, opt := range opts {
+ if err := opt.Apply(ll); err != nil {
+ return nil, err
+ }
+ }
+
+ if _, err := ll.clientForCell(ephs.DefaultCell()); err != nil {
+ return nil, err
+ }
+
+ s3, err := ll.newS3Client()
+ if err != nil {
+ return nil, err
+ }
+ ll.s3Client = s3
+
+ return ll, nil
+}
+
+// Stat
+func (c *ephsLowLevelClientImpl) Stat(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error) {
+ ent, err := c.getProtoForPath(ctx, path)
+ if err != nil {
+ return nil, err
+ }
+
+ // Never send file contents in a stat response
+ if _, ok := ent.Content.GetContent().(*ephs_proto.FsEntry_Content_File); ok {
+ ent.Content = &ephs_proto.FsEntry_Content{
+ Content: &ephs_proto.FsEntry_Content_File{
+ File: &ephs_proto.FsEntry_File{},
+ },
+ }
+ }
+
+ return ent, nil
+}
+
+// Get
+func (c *ephsLowLevelClientImpl) Get(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, io.Reader, error) {
+ ent, err := c.getProtoForPath(ctx, path)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ if ent.Content.GetDirectory() != nil {
+ return nil, nil, ErrDirectory
+ }
+
+ reader, err := (&fsEntryWrapper{FsEntry: ent, s3Client: c.s3Client}).GetContents(ctx)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return ent, reader, nil
+}
+
+// Mkdir
+func (c *ephsLowLevelClientImpl) Mkdir(ctx context.Context, path ephs.Path, owner string, recursive bool) error {
+ return c.mkdir(ctx, path, owner, recursive)
+}
+
+// Delete
+func (c *ephsLowLevelClientImpl) Delete(ctx context.Context, ephsPath ephs.Path, recursive bool) error {
+ entry, err := c.getProtoForPath(ctx, ephsPath)
+ if err != nil {
+ return err
+ }
+
+ fse := &fsEntryWrapper{FsEntry: entry, s3Client: c.s3Client}
+ if fse.Content.GetDirectory() != nil {
+ if len(fse.Content.GetDirectory().Entries) > 0 && !recursive {
+ return status.Errorf(codes.FailedPrecondition,
+ "refusing to delete non-empty directory %q without recursion flag set",
+ ephsPath.String())
+ }
+
+ for _, ent := range fse.Content.GetDirectory().Entries {
+ if err := c.Delete(ctx, ephsPath.Child(ent.Name), recursive); err != nil {
+ return err
+ }
+ }
+ } else {
+ if err := fse.DeleteContents(ctx); err != nil {
+ return err
+ }
+ }
+
+ err = c.modifyDirectory(ctx, ephsPath.Parent(), entry.Owner, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error {
+ index := -1
+ for i, ent := range dEnt.Entries {
+ if ent.Name == ephsPath.Basename() {
+ index = i
+ break
+ }
+ }
+ if index >= 0 {
+ dEnt.Entries = slices.Delete(dEnt.Entries, index, index+1)
+ }
+ stm.Del(ephsPath.EtcdKey())
+ return nil
+ })
+
+ return err
+}
+
+// Put
+func (c *ephsLowLevelClientImpl) Put(ctx context.Context, ephsPath ephs.Path, owner string, version, size uint64, contents io.Reader) (*ephs_proto.FsEntry, error) {
+ _, err := c.getProtoForPath(ctx, ephsPath.Parent())
+ if err != nil {
+ if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
+ return nil, status.Errorf(
+ codes.FailedPrecondition,
+ "cannot write %q: parent directory %q does not exist",
+ ephsPath.String(), ephsPath.Parent().String())
+ }
+ return nil, err
+ }
+
+ pbEnt, err := c.getProtoForPath(ctx, ephsPath)
+ if err != nil {
+ if status, ok := status.FromError(err); ok && status.Code() == codes.NotFound {
+ // NotFound is only returned after all access checks have succeeded. Ok to
+ // create file.
+ now := timestamppb.Now()
+ pbEnt = &ephs_proto.FsEntry{
+ Created: now,
+ Modified: now,
+ Version: version - 1,
+ Size: size,
+ Owner: owner,
+ }
+ } else {
+ // some other error besides not-found
+ return nil, err
+ }
+ } else {
+ if version != 0 && version != (pbEnt.Version+1) {
+ return nil, status.Errorf(
+ codes.FailedPrecondition,
+ "version conflict: attempted to overwrite version %d with %d, "+
+ "version number must be current version + 1",
+ pbEnt.Version,
+ version)
+ }
+ }
+ ent := &fsEntryWrapper{FsEntry: pbEnt, s3Client: c.s3Client}
+ if pbEnt.Content.GetDirectory() != nil {
+ return nil, status.Errorf(codes.FailedPrecondition,
+ "cannot write %q: path already exists and is a directory",
+ ephsPath.String())
+ }
+
+ if err := ent.SetContents(ctx, contents, size); err != nil {
+ return nil, err
+ }
+
+ ent.FsEntry.Modified = timestamppb.Now()
+ ent.FsEntry.Size = size
+ ent.FsEntry.Version++
+
+ marshaled, err := proto.Marshal(ent.FsEntry)
+ if err != nil {
+ return nil, err
+ }
+
+ err = c.modifyDirectory(ctx, ephsPath.Parent(), owner, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error {
+ stm.Put(ephsPath.EtcdKey(), string(marshaled))
+ for _, ent := range dEnt.Entries {
+ if ent.Name == ephsPath.Basename() {
+ return nil
+ }
+ }
+
+ dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{
+ Name: ephsPath.Basename(),
+ Directory: false,
+ })
+
+ return nil
+ })
+
+ if err != nil {
+ _ = ent.DeleteContents(ctx)
+ return nil, err
+ }
+
+ return ent.FsEntry, nil
+}
+
+// Watch
+func (c *ephsLowLevelClientImpl) Watch(ctx context.Context, ephsPath ephs.Path) (<-chan *ephs_proto.WatchResponse, error) {
+ ctx = etcd_client.WithRequireLeader(ctx)
+ resp := &ephs_proto.WatchResponse{
+ Entry: nil,
+ Event: ephs_proto.WatchResponse_DELETE,
+ }
+
+ if fse, err := c.getProtoForPath(ctx, ephsPath); err == nil {
+ resp.Entry = fse
+ resp.Event = ephs_proto.WatchResponse_CREATE
+ }
+
+ etcd, err := c.clientForCell(ephsPath.Cell())
+ if err != nil {
+ return nil, err
+ }
+
+ ch := make(chan *ephs_proto.WatchResponse)
+
+ go c.watch(ctx, ephsPath, ch, resp, etcd)
+
+ return ch, nil
+}
+
+func (c *ephsLowLevelClientImpl) watch(ctx context.Context, ephsPath ephs.Path, ch chan *ephs_proto.WatchResponse, resp *ephs_proto.WatchResponse, etcd *etcd_client.Client) {
+ defer close(ch)
+
+ ch <- resp
+
+ for ctx.Err() == nil {
+ watcher := etcd.Watch(ctx, ephsPath.EtcdKey())
+ for msg := range watcher {
+ c.logger.Debugf("watcher: got event: %+v", msg)
+ for _, event := range msg.Events {
+ if string(event.Kv.Key) != ephsPath.EtcdKey() {
+ continue
+ }
+
+ resp.Entry = &ephs_proto.FsEntry{}
+
+ if event.IsCreate() {
+ resp.Event = ephs_proto.WatchResponse_CREATE
+ } else if event.IsModify() {
+ resp.Event = ephs_proto.WatchResponse_MODIFY
+ } else if event.Type == etcd_client.EventTypeDelete {
+ resp.Entry = nil
+ resp.Event = ephs_proto.WatchResponse_DELETE
+ } else {
+ continue
+ }
+
+ if resp.Entry != nil {
+ if err := proto.Unmarshal(event.Kv.Value, resp.Entry); err != nil {
+ c.logger.Errorf("protobuf unmarshal error: %v", err)
+ return
+ }
+ }
+
+ ch <- resp
+ }
+ }
+ }
+}
+
+func (c *ephsLowLevelClientImpl) mkdir(ctx context.Context, ephsPath ephs.Path, owner string, recursive bool) error {
+ etcd, err := c.clientForCell(ephsPath.Cell())
+ if err != nil {
+ return err
+ }
+
+ txn, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error {
+ var (
+ dir *ephs_proto.FsEntry = newDir(owner)
+ err error
+ )
+ for _, ancestor := range ephsPath.Lineage() {
+ parent := ancestor.Parent()
+ if ancestor == ancestor.Parent() {
+ continue
+ }
+ dirRaw := []byte(stm.Get(parent.EtcdKey()))
+ if len(dirRaw) == 0 {
+ if !recursive {
+ return status.Errorf(codes.NotFound,
+ "cannot mkdir %q: parent directory %q does not exist and recursion is disabled",
+ ephsPath.String(), parent)
+ }
+
+ dir = newDir(owner)
+ } else {
+ err = proto.Unmarshal(dirRaw, dir)
+ if err != nil {
+ return status.Errorf(codes.Internal,
+ "error unmarshaling parent directory %q: %T: %v",
+ parent, err, err)
+ }
+
+ if dir.Content.GetDirectory() == nil {
+ return status.Errorf(
+ codes.FailedPrecondition,
+ "cannot mkdir %q: parent item %q is not a directory (raw len: %d)",
+ ephsPath.String(), parent, len(dirRaw))
+ }
+
+ dir.Modified = timestamppb.Now()
+ dir.Version += 1
+ }
+
+ dEnt := dir.Content.GetDirectory()
+
+ dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{
+ Name: ancestor.Basename(),
+ Directory: true,
+ })
+
+ dirRaw, err = proto.Marshal(dir)
+ if err != nil {
+ return status.Errorf(codes.Internal,
+ "error marshaling parent directory %q: %T: %v",
+ parent, err, err)
+ }
+
+ stm.Put(parent.EtcdKey(), string(dirRaw))
+ }
+
+ entry := []byte(stm.Get(ephsPath.EtcdKey()))
+ if len(entry) > 0 {
+ return status.Errorf(codes.AlreadyExists,
+ "cannot create directory %q: already exists",
+ ephsPath.String())
+ }
+
+ dirRaw, err := proto.Marshal(newDir(owner))
+ if err != nil {
+ return status.Errorf(codes.Internal,
+ "error marshaling new directory %q: %T: %v",
+ ephsPath.String(), err, err)
+ }
+
+ stm.Put(ephsPath.EtcdKey(), string(dirRaw))
+ return nil
+ }, etcd_concurrency.WithAbortContext(ctx))
+ if err != nil {
+ return err
+ }
+ if !txn.Succeeded {
+ return fmt.Errorf("transaction failed: %+v", txn)
+ }
+
+ return nil
+}
+
+func (c *ephsLowLevelClientImpl) modifyDirectory(ctx context.Context, ephsPath ephs.Path, owner string, apply func(*ephs_proto.FsEntry_Directory, etcd_concurrency.STM) error) (err error) {
+ etcd, err := c.clientForCell(ephsPath.Cell())
+ if err != nil {
+ return err
+ }
+
+ txnResp, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error {
+ dirRaw := []byte(stm.Get(ephsPath.EtcdKey()))
+ dir := newDir(owner)
+ if len(dirRaw) > 0 {
+ if err := proto.Unmarshal(dirRaw, dir); err != nil {
+ return fmt.Errorf("error unmarshaling FsEntry at %q: %v", ephsPath.String(), err)
+ }
+ }
+
+ if dir.Content.GetDirectory() == nil {
+ return fmt.Errorf("ephs path %q: not a directory", ephsPath.String())
+ }
+
+ if err = apply(dir.Content.GetDirectory(), stm); err != nil {
+ return fmt.Errorf("error performing atomic directory modification: %v", err)
+ }
+
+ dir.Modified = timestamppb.Now()
+ dir.Version += 1
+
+ if dirRaw, err = proto.Marshal(dir); err != nil {
+ return fmt.Errorf("error re-marshaling FsEntry at %q after calling apply: %v", ephsPath.String(), err)
+ }
+
+ stm.Put(ephsPath.EtcdKey(), string(dirRaw))
+
+ return nil
+ }, etcd_concurrency.WithAbortContext(ctx))
+
+ c.logger.Debugf("transaction status modifying directory %q: %+v", ephsPath.String(), txnResp)
+
+ return err
+}
+
+func (c *ephsLowLevelClientImpl) getProtoForPath(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error) {
+ var err error
+ etcd, err := c.clientForCell(path.Cell())
+ if err != nil {
+ return nil, err
+ }
+
+ obj, err := etcd.Get(ctx, path.EtcdKey())
+ if err != nil {
+ return nil, err
+ }
+
+ if len(obj.Kvs) < 1 {
+ return nil, status.Error(
+ codes.NotFound,
+ path.String())
+ }
+
+ entry := &ephs_proto.FsEntry{}
+ if err := proto.Unmarshal(obj.Kvs[0].Value, entry); err != nil {
+ return nil, status.Errorf(
+ codes.DataLoss,
+ "failed to unmarshal key %q as %s: %v",
+ obj.Kvs[0].Key,
+ proto.MessageName(entry),
+ err)
+ }
+
+ return entry, nil
+}
+
+func (c *ephsLowLevelClientImpl) clientForCell(cell string) (*etcd_client.Client, error) {
+ c.clientMu.Lock()
+ defer c.clientMu.Unlock()
+
+ if _, ok := c.clients[cell]; !ok {
+ client, err := sd.NewEtcdClient(
+ c.id,
+ cell,
+ )
+ if err != nil {
+ return nil, err
+ }
+ c.clients[cell] = client
+ }
+
+ return c.clients[cell], nil
+}
+
+func newDir(owner string) *ephs_proto.FsEntry {
+ return &ephs_proto.FsEntry{
+ Content: &ephs_proto.FsEntry_Content{
+ Content: &ephs_proto.FsEntry_Content_Directory{
+ Directory: &ephs_proto.FsEntry_Directory{},
+ },
+ },
+ Owner: owner,
+ Created: timestamppb.Now(),
+ Modified: timestamppb.Now(),
+ Version: 1,
+ }
+}
-package servicer
+package ephsll
import (
"errors"
var s3Prefix = ""
func WithAWSEnvCredentials() Option {
- return option.NewOption(func(s *ephsServicer) error {
+ return option.NewOption(func(s *ephsLowLevelClientImpl) error {
s.s3Creds = credentials.NewEnvAWS()
cc := &credentials.CredContext{
}
func WithAWSCredentialFile(filename string) Option {
- return option.NewOption(func(s *ephsServicer) error {
+ return option.NewOption(func(s *ephsLowLevelClientImpl) error {
s.s3Creds = credentials.NewFileAWSCredentials(filename, "default")
cc := &credentials.CredContext{
})
}
-func (s *ephsServicer) newS3Client() (*minio.Client, error) {
+func (s *ephsLowLevelClientImpl) newS3Client() (*minio.Client, error) {
if !flag.Parsed() {
return nil, errors.New("flags not yet parsed")
}
--- /dev/null
+package ephs
+
+import (
+ "errors"
+ "flag"
+ "path"
+ "strings"
+
+ "go.fuhry.dev/runtime/utils/hostname"
+)
+
+type Path interface {
+ AclPath() string
+ Cell() string
+ EtcdKey() string
+ LiteralCell() string
+ String() string
+ Lineage() []Path
+ // Parent returns the parent directory of this path.
+ //
+ // If this path is the root path of a cell,
+ Parent() Path
+ Parents() []Path
+ Child(string) Path
+ Basename() string
+}
+
+type ephsPath string
+
+const (
+ localCell = "local"
+ cellIndex = 2
+)
+
+var ephsDefaultCell string
+
+var ErrNotEphsPath = errors.New("not an ephs path")
+
+func DefaultCell() string {
+ return ephsDefaultCell
+}
+
+func IsEphsPath(p string) bool {
+ if !strings.HasPrefix(p, KeyPrefix) {
+ return false
+ }
+
+ // "/ephs/cell.example.com/subdir"
+ // 0 1 2 3
+ parts := strings.Split(p, PathSeparator)
+ if len(parts) < 3 {
+ return false
+ }
+
+ // all path components must be non-empty except the first and last
+ for i := 1; i < len(parts)-1; i++ {
+ if parts[i] == "" {
+ return false
+ }
+ }
+
+ // "." and ".." are not supported as path components
+ for _, c := range parts {
+ if c == "." || c == ".." {
+ return false
+ }
+ }
+
+ return true
+}
+
+func ParsePath(p string) (Path, error) {
+ if !IsEphsPath(p) {
+ return nil, ErrNotEphsPath
+ }
+
+ return ephsPath(p), nil
+}
+
+func (p ephsPath) AclPath() string {
+ parts := strings.Split(string(p), PathSeparator)
+ return strings.Join(parts[3:], PathSeparator)
+}
+
+func (p ephsPath) Basename() string {
+ return path.Base(string(p))
+}
+
+func (p ephsPath) Cell() string {
+ cell := p.LiteralCell()
+ if cell == localCell {
+ return ephsDefaultCell
+ }
+ return cell
+}
+
+func (p ephsPath) LiteralCell() string {
+ parts := strings.Split(string(p), PathSeparator)
+ return parts[cellIndex]
+}
+
+func (p ephsPath) EtcdKey() string {
+ parts := strings.Split(string(p), PathSeparator)
+ if parts[cellIndex] == localCell {
+ parts[cellIndex] = ephsDefaultCell
+ }
+
+ return strings.Join(parts, PathSeparator)
+}
+
+func (p ephsPath) Parent() Path {
+ parts := strings.Split(string(p), PathSeparator)
+ if len(parts) == cellIndex+1 {
+ return p
+ }
+
+ parent := strings.Join(parts[:len(parts)-1], PathSeparator)
+ return ephsPath(parent)
+}
+
+func (p ephsPath) Parents() []Path {
+ var parents []Path
+ parts := strings.Split(string(p), PathSeparator)
+ for i := 3; i < len(parts); i++ {
+ parents = append(parents, ephsPath(strings.Join(parts[:i], PathSeparator)))
+ }
+
+ return parents
+}
+
+func (p ephsPath) Lineage() []Path {
+ lineage := p.Parents()
+ lineage = append(lineage, p)
+ return lineage
+}
+
+func (p ephsPath) Child(name string) Path {
+ return ephsPath(string(p) + PathSeparator + name)
+}
+
+func (p ephsPath) String() string {
+ return string(p)
+}
+
+func init() {
+ flag.StringVar(&defaultClientAddr, "ephs.addr", "", "comma-separated list of server addresses for ephs; leave blank to use service discovery")
+ flag.StringVar(&ephsDefaultCell, "ephs.cell", hostname.DomainName(), "ephs cell")
+}
name = "servicer",
srcs = [
"acl.go",
- "fs_object.go",
- "s3.go",
"servicer.go",
"writer.go",
],
importpath = "go.fuhry.dev/runtime/ephs/servicer",
visibility = ["//visibility:public"],
deps = [
- "//constants",
"//ephs",
+ "//ephs/ephsll",
"//grpc",
"//mtls",
"//mtls/fsnotify",
"//proto/service/ephs",
- "//sd",
- "//utils/hostname",
"//utils/log",
"//utils/option",
"//utils/stringmatch",
- "@com_github_minio_minio_go_v7//:minio-go",
- "@com_github_minio_minio_go_v7//pkg/credentials",
"@in_gopkg_yaml_v3//:yaml_v3",
- "@io_etcd_go_etcd_client_v3//:client",
- "@io_etcd_go_etcd_client_v3//concurrency",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//peer",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//proto",
"@org_golang_google_protobuf//types/known/emptypb",
- "@org_golang_google_protobuf//types/known/timestamppb",
],
)
+// Package servicer implements the gRPC servicer for ephs.
+//
+// The servicer implements all access control logic.
+//
+// Interaction with etcd takes place in the `go.fuhry.dev/runtime/ephs/ephsll` package.
+
package servicer
import (
"bytes"
"context"
"errors"
- "flag"
- "fmt"
"io"
- "path"
- "slices"
- "strings"
- "sync"
-
- "github.com/minio/minio-go/v7"
- "github.com/minio/minio-go/v7/pkg/credentials"
- etcd_client "go.etcd.io/etcd/client/v3"
- etcd_concurrency "go.etcd.io/etcd/client/v3/concurrency"
+
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
- "google.golang.org/protobuf/types/known/timestamppb"
"go.fuhry.dev/runtime/ephs"
+ "go.fuhry.dev/runtime/ephs/ephsll"
"go.fuhry.dev/runtime/grpc"
"go.fuhry.dev/runtime/mtls"
"go.fuhry.dev/runtime/mtls/fsnotify"
ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
- "go.fuhry.dev/runtime/sd"
- "go.fuhry.dev/runtime/utils/hostname"
"go.fuhry.dev/runtime/utils/log"
"go.fuhry.dev/runtime/utils/option"
)
type ephsServicer struct {
ephs_proto.EphsServer
- acl *Acl
- logger log.Logger
- clientMu sync.Mutex
- clients map[string]*etcd_client.Client
- s3Creds *credentials.Credentials
- s3Client *minio.Client
+ acl *Acl
+ logger log.Logger
+ ll ephsll.EphsLowLevelClient
+ llOpts []ephsll.Option
}
var _ ephs_proto.EphsServer = &ephsServicer{}
})
}
+func WithAWSEnvCredentials() Option {
+ return option.NewOption(func(s *ephsServicer) error {
+ s.llOpts = append(s.llOpts, ephsll.WithAWSEnvCredentials())
+ return nil
+ })
+}
+
+func WithAWSCredentialFile(filename string) Option {
+ return option.NewOption(func(s *ephsServicer) error {
+ s.llOpts = append(s.llOpts, ephsll.WithAWSCredentialFile(filename))
+ return nil
+ })
+}
+
func NewEphsServicer(opts ...Option) (ephs_proto.EphsServer, error) {
serv := &ephsServicer{
- clients: make(map[string]*etcd_client.Client),
- logger: log.Default().WithPrefix("ephsServicer"),
- }
- if _, err := serv.clientForCell(cell); err != nil {
- return nil, err
+ logger: log.Default().WithPrefix("ephsServicer"),
}
+
for _, o := range opts {
if err := o.Apply(serv); err != nil {
return nil, err
}
}
- s3, err := serv.newS3Client()
+ ll, err := ephsll.NewEphsLowLevelClient(mtls.DefaultIdentity(), serv.llOpts...)
if err != nil {
return nil, err
}
- serv.s3Client = s3
+ serv.ll = ll
return serv, nil
}
-func (s *ephsServicer) clientForCell(cell string) (*etcd_client.Client, error) {
- s.clientMu.Lock()
- defer s.clientMu.Unlock()
-
- if _, ok := s.clients[cell]; !ok {
- client, err := sd.NewEtcdClient(
- mtls.DefaultIdentity(),
- cell,
- )
- if err != nil {
- return nil, err
- }
- s.clients[cell] = client
- }
-
- return s.clients[cell], nil
-}
-
-func (s *ephsServicer) checkPath(ctx context.Context, path string) (string, error) {
- if !ephs.IsEphsPath(path) {
- return "", status.Errorf(codes.InvalidArgument, "path %q must start with "+ephs.KeyPrefix, path)
- }
-
- parts := strings.Split(path, "/")
-
- if len(parts) < 3 {
- return "", status.Errorf(
+func (s *ephsServicer) checkPath(ctx context.Context, p string) (ephs.Path, error) {
+ ephsPath, err := ephs.ParsePath(p)
+ if err != nil {
+ return nil, status.Errorf(
codes.InvalidArgument,
- "path must be at least 3 levels deep: %q",
- path)
- }
-
- for i, part := range parts {
- if (i > 0 && part == "") || part == "." || part == ".." {
- return "", status.Errorf(codes.InvalidArgument,
- "path %q is invalid: may not contain empty elements, '.' or '..'", path)
- }
+ "failed to parse ephs path %q: %v",
+ p, err)
}
ident, err := identityFromContext(ctx)
if err != nil {
- return "", err
- }
-
- aclPath := strings.Join(parts[3:], "/")
-
- if parts[2] == "local" {
- parts[2] = cell
+ return nil, err
}
if s.acl != nil {
- if !s.acl.Check(ident, aclPath) {
- return "", status.Errorf(
+ if !s.acl.Check(ident, ephsPath.AclPath()) {
+ return nil, status.Errorf(
codes.PermissionDenied,
"access to path %q denied for %q (rule path: %q)",
- strings.Join(parts, "/"),
+ ephsPath.String(),
ident,
- aclPath)
+ ephsPath.AclPath())
}
}
- return strings.Join(parts, "/"), nil
+ return ephsPath, nil
}
func (s *ephsServicer) Stat(ctx context.Context, req *ephs_proto.GetRequest) (*ephs_proto.StatResponse, error) {
s.logRequest(ctx, "Stat", req)
- entry, err := s.getPath(ctx, req.Path)
+ ephsPath, err := s.checkPath(ctx, req.Path)
if err != nil {
return nil, err
}
- return &ephs_proto.StatResponse{
- Entry: entry,
- }, nil
-}
-
-func (s *ephsServicer) getPath(ctx context.Context, path string) (*ephs_proto.FsEntry, error) {
- var err error
- if path, err = s.checkPath(ctx, path); err != nil {
- return nil, err
- }
-
- cell := strings.Split(path, "/")[2]
- etcd, err := s.clientForCell(cell)
- if err != nil {
- return nil, err
- }
-
- obj, err := etcd.Get(ctx, path)
+ entry, err := s.ll.Stat(ctx, ephsPath)
if err != nil {
return nil, err
}
- if len(obj.Kvs) < 1 {
- return nil, status.Error(
- codes.NotFound,
- path)
- }
-
- entry := &ephs_proto.FsEntry{}
- if err := proto.Unmarshal(obj.Kvs[0].Value, entry); err != nil {
- return nil, status.Errorf(
- codes.DataLoss,
- "failed to unmarshal key %q as %s: %v",
- obj.Kvs[0].Key,
- proto.MessageName(entry),
- err)
- }
-
- return entry, nil
-}
-
-func (s *ephsServicer) modifyDirectory(ctx context.Context, path string, apply func(*ephs_proto.FsEntry_Directory, etcd_concurrency.STM) error) error {
- var err error
- if path, err = s.checkPath(ctx, path); err != nil {
- return err
- }
-
- cell := strings.Split(path, "/")[2]
- etcd, err := s.clientForCell(cell)
- if err != nil {
- return err
- }
-
- txnResp, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error {
- dirRaw := []byte(stm.Get(path))
- dir := newDir(ctx)
- if len(dirRaw) > 0 {
- if err := proto.Unmarshal(dirRaw, dir); err != nil {
- return fmt.Errorf("error unmarshaling FsEntry at %q: %v", path, err)
- }
- }
-
- if dir.Content.GetDirectory() == nil {
- return fmt.Errorf("ephs path %q: not a directory", path)
- }
-
- if err = apply(dir.Content.GetDirectory(), stm); err != nil {
- return fmt.Errorf("error performing atomic directory modification: %v", err)
- }
-
- dir.Modified = timestamppb.Now()
- dir.Version += 1
-
- if dirRaw, err = proto.Marshal(dir); err != nil {
- return fmt.Errorf("error re-marshaling FsEntry at %q after calling apply: %v", path, err)
- }
-
- stm.Put(path, string(dirRaw))
-
- return nil
- })
-
- s.logger.Debugf("transaction status modifying directory %q: %+v", path, txnResp)
-
- return err
+ return &ephs_proto.StatResponse{
+ Entry: entry,
+ }, nil
}
func (s *ephsServicer) logRequest(ctx context.Context, method string, req msgWithPath) {
ctx := server.Context()
s.logRequest(ctx, "Get", req)
- entry, err := s.getPath(ctx, req.Path)
+ ephsPath, err := s.checkPath(ctx, req.Path)
if err != nil {
return err
}
- if entry.Content.GetDirectory() != nil {
- return status.Errorf(
- codes.FailedPrecondition,
- "%q: is a directory",
- req.Path)
+ entry, reader, err := s.ll.Get(ctx, ephsPath)
+ if err != nil {
+ if errors.Is(err, ephsll.ErrDirectory) {
+ return status.Errorf(
+ codes.FailedPrecondition,
+ "%q: is a directory",
+ req.Path)
+ }
+
+ return err
}
- entryCopy := *entry
+ entryCopy := proto.Clone(entry).(*ephs_proto.FsEntry)
response := &ephs_proto.GetResponse{
- Entry: &entryCopy,
+ Entry: entryCopy,
}
response.Entry.Content = nil
- ent := &FsEntry{entry}
- reader, err := ent.GetContents(ctx, s.s3Client)
- if err != nil {
- return err
- }
-
writer := &responseWriter{server: server, response: response}
n, err := io.Copy(writer, reader)
if err != nil {
return nil
}
-func (s *ephsServicer) mkdir(ctx context.Context, pathName string, recursive bool) error {
- parts := strings.Split(pathName, "/")
-
- cell := parts[2]
- etcd, err := s.clientForCell(cell)
- if err != nil {
- return err
- }
-
- txn, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error {
- var (
- dir *ephs_proto.FsEntry = newDir(ctx)
- err error
- )
- for i := 3; i < len(parts); i++ {
- parent := strings.Join(parts[:i], "/")
-
- dirRaw := []byte(stm.Get(parent))
- if len(dirRaw) == 0 {
- if !recursive {
- return status.Errorf(codes.NotFound, "cannot mkdir %q: parent directory %q does not exist and recursion is disabled", pathName, parent)
- }
-
- dir = newDir(ctx)
- } else {
- err = proto.Unmarshal(dirRaw, dir)
- if err != nil {
- return status.Errorf(codes.Internal,
- "error unmarshaling parent directory %q: %T: %v",
- parent, err, err)
- }
-
- if dir.Content.GetDirectory() == nil {
- return status.Errorf(
- codes.FailedPrecondition,
- "cannot mkdir %q: parent item %q is not a directory (raw len: %d)",
- pathName, parent, len(dirRaw))
- }
-
- dir.Modified = timestamppb.Now()
- dir.Version += 1
- }
-
- dEnt := dir.Content.GetDirectory()
-
- dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{
- Name: parts[i],
- Directory: true,
- })
-
- dirRaw, err = proto.Marshal(dir)
- if err != nil {
- return status.Errorf(codes.Internal,
- "error marshaling parent directory %q: %T: %v",
- parent, err, err)
- }
-
- stm.Put(parent, string(dirRaw))
- }
-
- entry := []byte(stm.Get(pathName))
- if len(entry) > 0 {
- return status.Errorf(codes.AlreadyExists,
- "cannot create directory %q: already exists",
- pathName)
- }
-
- dirRaw, err := proto.Marshal(newDir(ctx))
- if err != nil {
- return status.Errorf(codes.Internal,
- "error marshaling new directory %q: %T: %v",
- pathName, err, err)
- }
-
- stm.Put(pathName, string(dirRaw))
- return nil
- })
- if err != nil {
- return err
- }
- if !txn.Succeeded {
- return fmt.Errorf("transaction failed: %+v", txn)
- }
-
- return nil
-}
-
func (s *ephsServicer) MkDir(ctx context.Context, req *ephs_proto.MkDirRequest) (*emptypb.Empty, error) {
path, err := s.checkPath(ctx, req.Path)
if err != nil {
return nil, err
}
- return &emptypb.Empty{}, s.mkdir(ctx, path, req.Recursive)
+ whoami, _ := identityFromContext(ctx)
+
+ return &emptypb.Empty{}, s.ll.Mkdir(ctx, path, whoami, req.Recursive)
}
func (s *ephsServicer) Delete(ctx context.Context, req *ephs_proto.DeleteRequest) (*emptypb.Empty, error) {
s.logRequest(ctx, "Delete", req)
- targetPath, err := s.checkPath(ctx, req.Path)
- if err != nil {
- return nil, err
- }
-
- entry, err := s.getPath(ctx, targetPath)
+ ephsPath, err := s.checkPath(ctx, req.Path)
if err != nil {
return nil, err
}
- fse := &FsEntry{entry}
- if fse.Content.GetDirectory() != nil {
- if len(fse.Content.GetDirectory().Entries) > 0 && !req.Recursive {
- return nil, status.Errorf(codes.FailedPrecondition,
- "refusing to delete non-empty directory %q without recursion flag set",
- targetPath)
- }
-
- for _, ent := range fse.Content.GetDirectory().Entries {
- subReq := &ephs_proto.DeleteRequest{
- Path: path.Join(targetPath, ent.Name),
- Recursive: true,
- }
- _, err = s.Delete(ctx, subReq)
- if err != nil {
- return nil, err
- }
- }
- } else {
- if err := fse.DeleteContents(ctx, s.s3Client); err != nil {
- return nil, err
- }
- }
-
- dir := path.Dir(targetPath)
- err = s.modifyDirectory(ctx, dir, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error {
- index := -1
- for i, ent := range dEnt.Entries {
- if ent.Name == path.Base(targetPath) {
- index = i
- break
- }
- }
- if index >= 0 {
- dEnt.Entries = slices.Delete(dEnt.Entries, index, index+1)
- }
- stm.Del(targetPath)
- return nil
- })
- if err != nil {
+ if err := s.ll.Delete(ctx, ephsPath, req.Recursive); err != nil {
return nil, err
}
return err
}
- targetPath, err := s.checkPath(ctx, msg.Path)
- if err != nil {
- return err
- }
-
- dir := path.Dir(targetPath)
- _, err = s.getPath(ctx, dir)
+ ephsPath, err := s.checkPath(ctx, msg.Path)
if err != nil {
- if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
- return status.Errorf(
- codes.FailedPrecondition,
- "cannot write %q: parent directory %q does not exist",
- targetPath, dir)
- }
return err
}
- pbEnt, err := s.getPath(ctx, targetPath)
- if err != nil {
- if status, ok := status.FromError(err); ok && status.Code() == codes.NotFound {
- // NotFound is only returned after all access checks have succeeded. Ok to
- // create file.
- now := timestamppb.Now()
- pbEnt = &ephs_proto.FsEntry{
- Created: now,
- Modified: now,
- Version: msg.Version - 1,
- Size: msg.Size,
- Owner: peerId,
- }
- } else {
- // some other error besides not-found
- return err
- }
- } else {
- if msg.Version != 0 && msg.Version != (pbEnt.Version+1) {
- return status.Errorf(
- codes.FailedPrecondition,
- "version conflict: attempted to overwrite version %d with %d, "+
- "version number must be current version + 1",
- pbEnt.Version,
- msg.Version)
- }
- }
- ent := &FsEntry{pbEnt}
- if pbEnt.Content.GetDirectory() != nil {
- return status.Errorf(codes.FailedPrecondition,
- "cannot write %q: path already exists and is a directory",
- targetPath)
- }
-
reader := &putReader{
initialMsg: msg,
server: server,
buf: &bytes.Buffer{},
}
- if err := ent.SetContents(ctx, s.s3Client, reader, msg.Size); err != nil {
- return err
- }
-
- ent.FsEntry.Modified = timestamppb.Now()
- ent.FsEntry.Size = msg.Size
- ent.FsEntry.Version++
-
- marshaled, err := proto.Marshal(ent.FsEntry)
- if err != nil {
- return err
- }
-
- err = s.modifyDirectory(ctx, dir, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error {
- stm.Put(targetPath, string(marshaled))
- base := path.Base(targetPath)
- for _, ent := range dEnt.Entries {
- if ent.Name == base {
- return nil
- }
- }
-
- dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{
- Name: base,
- Directory: false,
- })
-
- return nil
- })
-
- if err != nil {
- _ = ent.DeleteContents(ctx, s.s3Client)
- return err
- }
+ ent, err := s.ll.Put(ctx, ephsPath, peerId, msg.Version, msg.Size, reader)
response := &ephs_proto.StatResponse{
- Entry: ent.FsEntry,
+ Entry: ent,
}
return server.SendAndClose(response)
}
func (s *ephsServicer) Watch(req *ephs_proto.GetRequest, server ephs_proto.Ephs_WatchServer) error {
- ctx := etcd_client.WithRequireLeader(server.Context())
+ ctx := server.Context()
s.logRequest(ctx, "Watch", req)
- path, err := s.checkPath(ctx, req.Path)
- if err != nil {
- return err
- }
-
- response := &ephs_proto.WatchResponse{
- Entry: nil,
- Event: ephs_proto.WatchResponse_DELETE,
- }
-
- if resp, err := s.getPath(ctx, path); err == nil {
- response.Entry = resp
- response.Event = ephs_proto.WatchResponse_CREATE
- }
-
- cell := strings.Split(path, "/")[2]
- etcd, err := s.clientForCell(cell)
+ ephsPath, err := s.checkPath(ctx, req.Path)
if err != nil {
return err
}
- if err := server.Send(response); err != nil {
- return err
- }
-
for ctx.Err() == nil {
- watcher := etcd.Watch(ctx, path)
- for msg := range watcher {
- s.logger.Debugf("watcher: got event: %+v", msg)
- for _, event := range msg.Events {
- if string(event.Kv.Key) != path {
- continue
- }
-
- response.Entry = &ephs_proto.FsEntry{}
-
- if event.IsCreate() {
- response.Event = ephs_proto.WatchResponse_CREATE
- } else if event.IsModify() {
- response.Event = ephs_proto.WatchResponse_MODIFY
- } else if event.Type == etcd_client.EventTypeDelete {
- response.Entry = nil
- response.Event = ephs_proto.WatchResponse_DELETE
- } else {
- continue
- }
-
- if response.Entry != nil {
- if err = proto.Unmarshal(event.Kv.Value, response.Entry); err != nil {
- s.logger.Errorf("protobuf marshal error: %v", err)
- return err
- }
- }
-
- if err := server.Send(response); err != nil {
- s.logger.Errorf("stream send error: %v", err)
- return err
- }
+ ch, err := s.ll.Watch(ctx, ephsPath)
+ if err != nil {
+ return err
+ }
+
+ for msg := range ch {
+ if err := server.Send(msg); err != nil {
+ return err
}
}
}
- s.logger.Noticef("Watch(%q) ending: ctx err: %v", path, ctx.Err())
+ s.logger.V(1).Debugf("Watch(%q) ending: ctx err: %v", ephsPath.String(), ctx.Err())
if errors.Is(ctx.Err(), context.Canceled) {
return status.Error(codes.Unavailable, "server is shutting down, please reconnect")
return ident.String(), nil
}
-
-func newDir(ctx context.Context) *ephs_proto.FsEntry {
- whoami, _ := identityFromContext(ctx)
-
- return &ephs_proto.FsEntry{
- Content: &ephs_proto.FsEntry_Content{
- Content: &ephs_proto.FsEntry_Content_Directory{
- Directory: &ephs_proto.FsEntry_Directory{},
- },
- },
- Owner: whoami,
- Created: timestamppb.Now(),
- Modified: timestamppb.Now(),
- Version: 1,
- }
-}
-
-func init() {
- flag.StringVar(&cell, "ephs.cell", hostname.DomainName(), "ephs cell")
-}
"bytes"
"fmt"
+ "go.fuhry.dev/runtime/ephs/ephsll"
ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
"go.fuhry.dev/runtime/utils/log"
)
func (w *responseWriter) Write(p []byte) (int, error) {
log.Default().V(3).Debugf("responseWriter: writing %d bytes", len(p))
- chunkLen := min(len(p), LargeFileThreshold)
+ chunkLen := min(len(p), ephsll.LargeFileThreshold)
w.response.Chunk = make([]byte, chunkLen)
copy(w.response.Chunk, p)
"//net/dns",
"//sd",
"//utils/context",
+ "//utils/log",
"//utils/option",
"@org_golang_google_grpc//:grpc",
],
"context"
"fmt"
"net"
+ "strconv"
"go.fuhry.dev/runtime/grpc/internal/common"
"go.fuhry.dev/runtime/mtls"
+ "go.fuhry.dev/runtime/net/dns"
"go.fuhry.dev/runtime/sd"
+ "go.fuhry.dev/runtime/utils/log"
"go.fuhry.dev/runtime/utils/option"
"google.golang.org/grpc"
)
type client struct {
ctx context.Context
+ logger log.Logger
serverId mtls.Identity
clientId mtls.Identity
watcher AddressProvider
})
}
-func WithStaticAddress(addresses ...*net.TCPAddr) ClientOption {
- var addrs []sd.ServiceAddress
-
- for _, addr := range addresses {
- var ip4, ip6 string
- if len(addr.IP) == 4 {
- ip4 = addr.IP.String()
- } else {
- ip6 = addr.IP.String()
- }
- addrs = append(addrs, sd.ServiceAddress{
- IP4: ip4,
- IP6: ip6,
- Port: uint16(addr.Port),
- })
- }
-
+func WithStaticAddress(addresses ...string) ClientOption {
return option.NewOption(func(c *client) error {
+ var addrs []sd.ServiceAddress
+
+ for i, addr := range addresses {
+ host, port, err := net.SplitHostPort(addr)
+ if err != nil {
+ return fmt.Errorf("failed to parse host:port %q at index %d: %v", addr, i, err)
+ }
+
+ portNumber, err := net.LookupPort("tcp", port)
+ if err != nil {
+ return fmt.Errorf("failed to lookup named port %q at index %d: %v", port, i, err)
+ }
+
+ var ip4, ip6 string
+ if ip := net.ParseIP(host); ip != nil {
+ if len(ip) == 4 {
+ ip4 = ip.String()
+ } else {
+ ip6 = ip.String()
+ }
+ } else {
+ ip4, ip6, err = dns.ResolveDualStack(host)
+ if err != nil {
+ return fmt.Errorf("failed to lookup host %q at index %d: %v", host, i, err)
+ }
+ }
+
+ if ip4 == "" && ip6 == "" {
+ return fmt.Errorf(
+ "failed to parse address %q at index %d: IPv4 and IPv6 addresses both empty",
+ addr, i)
+ }
+
+ addrs = append(addrs, sd.ServiceAddress{
+ Hostname: host,
+ IP4: ip4,
+ IP6: ip6,
+ Port: uint16(portNumber),
+ })
+ }
c.watcher = &staticAddressProvider{
addresses: addrs,
}
func NewGrpcClient(ctx context.Context, serverId string, clientId mtls.Identity, opts ...ClientOption) (Client, error) {
cl := &client{
ctx: ctx,
+ logger: log.Default().WithPrefix(fmt.Sprintf("grpc.Client[%s]", serverId)),
serverId: mtls.NewRemoteServiceIdentity(serverId),
clientId: clientId,
connFac: common.NewDefaultConnectionFactory(),
dialer,
}
- target := fmt.Sprintf("%s:%d", addrs[0].Hostname, addrs[0].Port)
+ target := net.JoinHostPort(addrs[0].Hostname, strconv.Itoa(int(addrs[0].Port)))
+ c.logger.V(1).Infof("establishing conn to %s", target)
conn, err := grpc.DialContext(c.ctx, target, opts...)
if err != nil {
return nil, err