+// Package ephsll is the low-level client for ephs.
+//
+// The low-level client handles all direct interaction with etcd.
+
+package ephsll
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "slices"
+ "sync"
+
+ "github.com/minio/minio-go/v7"
+ "github.com/minio/minio-go/v7/pkg/credentials"
+ etcd_client "go.etcd.io/etcd/client/v3"
+ etcd_concurrency "go.etcd.io/etcd/client/v3/concurrency"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ "go.fuhry.dev/runtime/ephs"
+ "go.fuhry.dev/runtime/mtls"
+ ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
+ "go.fuhry.dev/runtime/sd"
+ "go.fuhry.dev/runtime/utils/log"
+ "go.fuhry.dev/runtime/utils/option"
+)
+
+type Option = option.Option[*ephsLowLevelClientImpl]
+
+type EphsLowLevelClient interface {
+ Stat(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error)
+ Get(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, io.Reader, error)
+ Mkdir(ctx context.Context, path ephs.Path, owner string, recursive bool) error
+ Delete(ctx context.Context, path ephs.Path, recursive bool) error
+ Put(ctx context.Context, path ephs.Path, owner string, version, size uint64, contents io.Reader) (*ephs_proto.FsEntry, error)
+ Watch(ctx context.Context, path ephs.Path) (<-chan *ephs_proto.WatchResponse, error)
+}
+
+type ephsLowLevelClientImpl struct {
+ id mtls.Identity
+ logger log.Logger
+ clientMu sync.Mutex
+ clients map[string]*etcd_client.Client
+ s3Creds *credentials.Credentials
+ s3Client *minio.Client
+}
+
+var _ EphsLowLevelClient = &ephsLowLevelClientImpl{}
+
+func NewEphsLowLevelClient(id mtls.Identity, opts ...Option) (EphsLowLevelClient, error) {
+ ll := &ephsLowLevelClientImpl{
+ id: id,
+ logger: log.WithPrefix("EphsLowLevelClient"),
+ clients: make(map[string]*etcd_client.Client),
+ }
+
+ for _, opt := range opts {
+ if err := opt.Apply(ll); err != nil {
+ return nil, err
+ }
+ }
+
+ if _, err := ll.clientForCell(ephs.DefaultCell()); err != nil {
+ return nil, err
+ }
+
+ s3, err := ll.newS3Client()
+ if err != nil {
+ return nil, err
+ }
+ ll.s3Client = s3
+
+ return ll, nil
+}
+
+// Stat
+func (c *ephsLowLevelClientImpl) Stat(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error) {
+ ent, err := c.getProtoForPath(ctx, path)
+ if err != nil {
+ return nil, err
+ }
+
+ // Never send file contents in a stat response
+ if _, ok := ent.Content.GetContent().(*ephs_proto.FsEntry_Content_File); ok {
+ ent.Content = &ephs_proto.FsEntry_Content{
+ Content: &ephs_proto.FsEntry_Content_File{
+ File: &ephs_proto.FsEntry_File{},
+ },
+ }
+ }
+
+ return ent, nil
+}
+
+// Get
+func (c *ephsLowLevelClientImpl) Get(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, io.Reader, error) {
+ ent, err := c.getProtoForPath(ctx, path)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ if ent.Content.GetDirectory() != nil {
+ return nil, nil, ErrDirectory
+ }
+
+ reader, err := (&fsEntryWrapper{FsEntry: ent, s3Client: c.s3Client}).GetContents(ctx)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return ent, reader, nil
+}
+
+// Mkdir
+func (c *ephsLowLevelClientImpl) Mkdir(ctx context.Context, path ephs.Path, owner string, recursive bool) error {
+ return c.mkdir(ctx, path, owner, recursive)
+}
+
+// Delete
+func (c *ephsLowLevelClientImpl) Delete(ctx context.Context, ephsPath ephs.Path, recursive bool) error {
+ entry, err := c.getProtoForPath(ctx, ephsPath)
+ if err != nil {
+ return err
+ }
+
+ fse := &fsEntryWrapper{FsEntry: entry, s3Client: c.s3Client}
+ if fse.Content.GetDirectory() != nil {
+ if len(fse.Content.GetDirectory().Entries) > 0 && !recursive {
+ return status.Errorf(codes.FailedPrecondition,
+ "refusing to delete non-empty directory %q without recursion flag set",
+ ephsPath.String())
+ }
+
+ for _, ent := range fse.Content.GetDirectory().Entries {
+ if err := c.Delete(ctx, ephsPath.Child(ent.Name), recursive); err != nil {
+ return err
+ }
+ }
+ } else {
+ if err := fse.DeleteContents(ctx); err != nil {
+ return err
+ }
+ }
+
+ err = c.modifyDirectory(ctx, ephsPath.Parent(), entry.Owner, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error {
+ index := -1
+ for i, ent := range dEnt.Entries {
+ if ent.Name == ephsPath.Basename() {
+ index = i
+ break
+ }
+ }
+ if index >= 0 {
+ dEnt.Entries = slices.Delete(dEnt.Entries, index, index+1)
+ }
+ stm.Del(ephsPath.EtcdKey())
+ return nil
+ })
+
+ return err
+}
+
+// Put
+func (c *ephsLowLevelClientImpl) Put(ctx context.Context, ephsPath ephs.Path, owner string, version, size uint64, contents io.Reader) (*ephs_proto.FsEntry, error) {
+ _, err := c.getProtoForPath(ctx, ephsPath.Parent())
+ if err != nil {
+ if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
+ return nil, status.Errorf(
+ codes.FailedPrecondition,
+ "cannot write %q: parent directory %q does not exist",
+ ephsPath.String(), ephsPath.Parent().String())
+ }
+ return nil, err
+ }
+
+ pbEnt, err := c.getProtoForPath(ctx, ephsPath)
+ if err != nil {
+ if status, ok := status.FromError(err); ok && status.Code() == codes.NotFound {
+ // NotFound is only returned after all access checks have succeeded. Ok to
+ // create file.
+ now := timestamppb.Now()
+ pbEnt = &ephs_proto.FsEntry{
+ Created: now,
+ Modified: now,
+ Version: version - 1,
+ Size: size,
+ Owner: owner,
+ }
+ } else {
+ // some other error besides not-found
+ return nil, err
+ }
+ } else {
+ if version != 0 && version != (pbEnt.Version+1) {
+ return nil, status.Errorf(
+ codes.FailedPrecondition,
+ "version conflict: attempted to overwrite version %d with %d, "+
+ "version number must be current version + 1",
+ pbEnt.Version,
+ version)
+ }
+ }
+ ent := &fsEntryWrapper{FsEntry: pbEnt, s3Client: c.s3Client}
+ if pbEnt.Content.GetDirectory() != nil {
+ return nil, status.Errorf(codes.FailedPrecondition,
+ "cannot write %q: path already exists and is a directory",
+ ephsPath.String())
+ }
+
+ if err := ent.SetContents(ctx, contents, size); err != nil {
+ return nil, err
+ }
+
+ ent.FsEntry.Modified = timestamppb.Now()
+ ent.FsEntry.Size = size
+ ent.FsEntry.Version++
+
+ marshaled, err := proto.Marshal(ent.FsEntry)
+ if err != nil {
+ return nil, err
+ }
+
+ err = c.modifyDirectory(ctx, ephsPath.Parent(), owner, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error {
+ stm.Put(ephsPath.EtcdKey(), string(marshaled))
+ for _, ent := range dEnt.Entries {
+ if ent.Name == ephsPath.Basename() {
+ return nil
+ }
+ }
+
+ dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{
+ Name: ephsPath.Basename(),
+ Directory: false,
+ })
+
+ return nil
+ })
+
+ if err != nil {
+ _ = ent.DeleteContents(ctx)
+ return nil, err
+ }
+
+ return ent.FsEntry, nil
+}
+
+// Watch
+func (c *ephsLowLevelClientImpl) Watch(ctx context.Context, ephsPath ephs.Path) (<-chan *ephs_proto.WatchResponse, error) {
+ ctx = etcd_client.WithRequireLeader(ctx)
+ resp := &ephs_proto.WatchResponse{
+ Entry: nil,
+ Event: ephs_proto.WatchResponse_DELETE,
+ }
+
+ if fse, err := c.getProtoForPath(ctx, ephsPath); err == nil {
+ resp.Entry = fse
+ resp.Event = ephs_proto.WatchResponse_CREATE
+ }
+
+ etcd, err := c.clientForCell(ephsPath.Cell())
+ if err != nil {
+ return nil, err
+ }
+
+ ch := make(chan *ephs_proto.WatchResponse)
+
+ go c.watch(ctx, ephsPath, ch, resp, etcd)
+
+ return ch, nil
+}
+
+func (c *ephsLowLevelClientImpl) watch(ctx context.Context, ephsPath ephs.Path, ch chan *ephs_proto.WatchResponse, resp *ephs_proto.WatchResponse, etcd *etcd_client.Client) {
+ defer close(ch)
+
+ ch <- resp
+
+ for ctx.Err() == nil {
+ watcher := etcd.Watch(ctx, ephsPath.EtcdKey())
+ for msg := range watcher {
+ c.logger.Debugf("watcher: got event: %+v", msg)
+ for _, event := range msg.Events {
+ if string(event.Kv.Key) != ephsPath.EtcdKey() {
+ continue
+ }
+
+ resp.Entry = &ephs_proto.FsEntry{}
+
+ if event.IsCreate() {
+ resp.Event = ephs_proto.WatchResponse_CREATE
+ } else if event.IsModify() {
+ resp.Event = ephs_proto.WatchResponse_MODIFY
+ } else if event.Type == etcd_client.EventTypeDelete {
+ resp.Entry = nil
+ resp.Event = ephs_proto.WatchResponse_DELETE
+ } else {
+ continue
+ }
+
+ if resp.Entry != nil {
+ if err := proto.Unmarshal(event.Kv.Value, resp.Entry); err != nil {
+ c.logger.Errorf("protobuf unmarshal error: %v", err)
+ return
+ }
+ }
+
+ ch <- resp
+ }
+ }
+ }
+}
+
+func (c *ephsLowLevelClientImpl) mkdir(ctx context.Context, ephsPath ephs.Path, owner string, recursive bool) error {
+ etcd, err := c.clientForCell(ephsPath.Cell())
+ if err != nil {
+ return err
+ }
+
+ txn, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error {
+ var (
+ dir *ephs_proto.FsEntry = newDir(owner)
+ err error
+ )
+ for _, ancestor := range ephsPath.Lineage() {
+ parent := ancestor.Parent()
+ if ancestor == ancestor.Parent() {
+ continue
+ }
+ dirRaw := []byte(stm.Get(parent.EtcdKey()))
+ if len(dirRaw) == 0 {
+ if !recursive {
+ return status.Errorf(codes.NotFound,
+ "cannot mkdir %q: parent directory %q does not exist and recursion is disabled",
+ ephsPath.String(), parent)
+ }
+
+ dir = newDir(owner)
+ } else {
+ err = proto.Unmarshal(dirRaw, dir)
+ if err != nil {
+ return status.Errorf(codes.Internal,
+ "error unmarshaling parent directory %q: %T: %v",
+ parent, err, err)
+ }
+
+ if dir.Content.GetDirectory() == nil {
+ return status.Errorf(
+ codes.FailedPrecondition,
+ "cannot mkdir %q: parent item %q is not a directory (raw len: %d)",
+ ephsPath.String(), parent, len(dirRaw))
+ }
+
+ dir.Modified = timestamppb.Now()
+ dir.Version += 1
+ }
+
+ dEnt := dir.Content.GetDirectory()
+
+ dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{
+ Name: ancestor.Basename(),
+ Directory: true,
+ })
+
+ dirRaw, err = proto.Marshal(dir)
+ if err != nil {
+ return status.Errorf(codes.Internal,
+ "error marshaling parent directory %q: %T: %v",
+ parent, err, err)
+ }
+
+ stm.Put(parent.EtcdKey(), string(dirRaw))
+ }
+
+ entry := []byte(stm.Get(ephsPath.EtcdKey()))
+ if len(entry) > 0 {
+ return status.Errorf(codes.AlreadyExists,
+ "cannot create directory %q: already exists",
+ ephsPath.String())
+ }
+
+ dirRaw, err := proto.Marshal(newDir(owner))
+ if err != nil {
+ return status.Errorf(codes.Internal,
+ "error marshaling new directory %q: %T: %v",
+ ephsPath.String(), err, err)
+ }
+
+ stm.Put(ephsPath.EtcdKey(), string(dirRaw))
+ return nil
+ }, etcd_concurrency.WithAbortContext(ctx))
+ if err != nil {
+ return err
+ }
+ if !txn.Succeeded {
+ return fmt.Errorf("transaction failed: %+v", txn)
+ }
+
+ return nil
+}
+
+func (c *ephsLowLevelClientImpl) modifyDirectory(ctx context.Context, ephsPath ephs.Path, owner string, apply func(*ephs_proto.FsEntry_Directory, etcd_concurrency.STM) error) (err error) {
+ etcd, err := c.clientForCell(ephsPath.Cell())
+ if err != nil {
+ return err
+ }
+
+ txnResp, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error {
+ dirRaw := []byte(stm.Get(ephsPath.EtcdKey()))
+ dir := newDir(owner)
+ if len(dirRaw) > 0 {
+ if err := proto.Unmarshal(dirRaw, dir); err != nil {
+ return fmt.Errorf("error unmarshaling FsEntry at %q: %v", ephsPath.String(), err)
+ }
+ }
+
+ if dir.Content.GetDirectory() == nil {
+ return fmt.Errorf("ephs path %q: not a directory", ephsPath.String())
+ }
+
+ if err = apply(dir.Content.GetDirectory(), stm); err != nil {
+ return fmt.Errorf("error performing atomic directory modification: %v", err)
+ }
+
+ dir.Modified = timestamppb.Now()
+ dir.Version += 1
+
+ if dirRaw, err = proto.Marshal(dir); err != nil {
+ return fmt.Errorf("error re-marshaling FsEntry at %q after calling apply: %v", ephsPath.String(), err)
+ }
+
+ stm.Put(ephsPath.EtcdKey(), string(dirRaw))
+
+ return nil
+ }, etcd_concurrency.WithAbortContext(ctx))
+
+ c.logger.Debugf("transaction status modifying directory %q: %+v", ephsPath.String(), txnResp)
+
+ return err
+}
+
+func (c *ephsLowLevelClientImpl) getProtoForPath(ctx context.Context, path ephs.Path) (*ephs_proto.FsEntry, error) {
+ var err error
+ etcd, err := c.clientForCell(path.Cell())
+ if err != nil {
+ return nil, err
+ }
+
+ obj, err := etcd.Get(ctx, path.EtcdKey())
+ if err != nil {
+ return nil, err
+ }
+
+ if len(obj.Kvs) < 1 {
+ return nil, status.Error(
+ codes.NotFound,
+ path.String())
+ }
+
+ entry := &ephs_proto.FsEntry{}
+ if err := proto.Unmarshal(obj.Kvs[0].Value, entry); err != nil {
+ return nil, status.Errorf(
+ codes.DataLoss,
+ "failed to unmarshal key %q as %s: %v",
+ obj.Kvs[0].Key,
+ proto.MessageName(entry),
+ err)
+ }
+
+ return entry, nil
+}
+
+func (c *ephsLowLevelClientImpl) clientForCell(cell string) (*etcd_client.Client, error) {
+ c.clientMu.Lock()
+ defer c.clientMu.Unlock()
+
+ if _, ok := c.clients[cell]; !ok {
+ client, err := sd.NewEtcdClient(
+ c.id,
+ cell,
+ )
+ if err != nil {
+ return nil, err
+ }
+ c.clients[cell] = client
+ }
+
+ return c.clients[cell], nil
+}
+
+func newDir(owner string) *ephs_proto.FsEntry {
+ return &ephs_proto.FsEntry{
+ Content: &ephs_proto.FsEntry_Content{
+ Content: &ephs_proto.FsEntry_Content_Directory{
+ Directory: &ephs_proto.FsEntry_Directory{},
+ },
+ },
+ Owner: owner,
+ Created: timestamppb.Now(),
+ Modified: timestamppb.Now(),
+ Version: 1,
+ }
+}