]> go.fuhry.dev Git - runtime.git/commitdiff
[ephs] `edit` command, client library and server improvements
authorDan Fuhry <dan@fuhry.com>
Fri, 21 Nov 2025 00:34:00 +0000 (19:34 -0500)
committerDan Fuhry <dan@fuhry.com>
Fri, 21 Nov 2025 00:34:00 +0000 (19:34 -0500)
- fixed server failing to build since 020d9e6d because s3.go referenced the removed `genericOption` type
- add custom error type for not-found errors so they are easier to distinguish
- validate S3 credentials on server startup
- add `edit` command to ephs commandline client to allow editing text files directly in ephs

cmd/ephs_client/BUILD.bazel
cmd/ephs_client/main.go
ephs/client.go
ephs/servicer/s3.go
ephs/servicer/servicer.go

index ee40c3280ae021461d19ed72e5d9d714c25778f4..906286db0cdb03064cdb41275362987d412ffb9c 100644 (file)
@@ -9,6 +9,7 @@ go_library(
         "//constants",
         "//ephs",
         "//utils/context",
+        "//utils/fsutil",
         "//utils/log",
         "@com_github_urfave_cli_v3//:cli",
     ],
index b0b113ddc6993053b67d6ad80b8782e8d78e9b46..da5aeaaee60398b1558da3174eb8db8cd170234b 100644 (file)
@@ -1,19 +1,23 @@
 package main
 
 import (
+       "bytes"
        "errors"
        "flag"
        "fmt"
        "io"
        "os"
+       "os/exec"
        "path"
        "strings"
+       "unicode/utf8"
 
        "github.com/urfave/cli/v3"
 
        "go.fuhry.dev/runtime/constants"
        "go.fuhry.dev/runtime/ephs"
        "go.fuhry.dev/runtime/utils/context"
+       "go.fuhry.dev/runtime/utils/fsutil"
        "go.fuhry.dev/runtime/utils/log"
 )
 
@@ -154,6 +158,103 @@ func cmdMkdir(ctx context.Context, cmd *cli.Command) error {
        return nil
 }
 
+func cmdEdit(ctx context.Context, cmd *cli.Command) error {
+       client, err := ephs.DefaultClient()
+       if err != nil {
+               return err
+       }
+
+       buf := bytes.NewBuffer(nil)
+       temp, err := os.CreateTemp(os.TempDir(), "ephs*")
+       if err != nil {
+               return err
+       }
+       tempFileName := temp.Name()
+       defer os.Remove(tempFileName)
+
+       reader, err := client.GetContext(ctx, cmd.StringArg("path"))
+       if err == nil {
+               tee := io.TeeReader(reader, buf)
+
+               io.Copy(temp, tee)
+               temp.Close()
+
+               if !utf8.Valid(buf.Bytes()) {
+                       return errors.New("cannot edit a binary file")
+               }
+       } else if ephs.IsNotFound(err) {
+               temp.Close()
+               log.Default().Infof("path %q not found, creating new file", cmd.StringArg("path"))
+       } else {
+               temp.Close()
+               return err
+       }
+
+       editor := os.Getenv("EDITOR")
+       if editor == "" {
+               for _, f := range []string{"/usr/bin/vim", "/usr/bin/vi", "/usr/bin/nano"} {
+                       if err := fsutil.FileExistsAndIsReadable(f); err == nil {
+                               editor = f
+                               break
+                       }
+               }
+       }
+       if editor == "" {
+               return errors.New("EDITOR not set and cannot find any suitable fallback")
+       }
+       if !path.IsAbs(editor) {
+               editor, err = exec.LookPath(editor)
+               if err != nil {
+                       return err
+               }
+       }
+
+       editorProc, err := os.StartProcess(
+               editor,
+               []string{
+                       editor,
+                       tempFileName,
+               },
+               &os.ProcAttr{
+                       Files: []*os.File{
+                               os.Stdin,
+                               os.Stdout,
+                               os.Stderr,
+                       },
+               },
+       )
+       if err != nil {
+               return err
+       }
+       editorState, err := editorProc.Wait()
+       if err != nil {
+               return err
+       }
+       if status := editorState.ExitCode(); status != 0 {
+               return fmt.Errorf("editor %s exited with status %d", editor, status)
+       }
+
+       newContents, err := os.ReadFile(tempFileName)
+       if err != nil {
+               return err
+       }
+
+       if bytes.Equal(newContents, buf.Bytes()) {
+               log.Default().Info("file unchanged, not reuploading to ephs")
+               return nil
+       }
+
+       newBuf := bytes.NewBuffer(newContents)
+       putResult, err := client.PutContext(ctx, cmd.StringArg("path"), uint64(len(newContents)), newBuf)
+       if err != nil {
+               return err
+       }
+
+       fmt.Println(ephs.FormatFsEntry(putResult))
+
+       return nil
+}
+
 func main() {
        ctx, cancel := context.Interruptible()
        defer cancel()
@@ -249,6 +350,17 @@ func main() {
                                        },
                                },
                        },
+                       {
+                               Name:        "edit",
+                               Description: "edit a text file",
+                               Action:      cmdEdit,
+                               Arguments: []cli.Argument{
+                                       &cli.StringArg{
+                                               Name:      "path",
+                                               UsageText: "path to edit",
+                                       },
+                               },
+                       },
                },
 
                Flags: []cli.Flag{
@@ -268,6 +380,7 @@ func main() {
        }
 
        if err := cmd.Run(ctx, os.Args); err != nil {
-               log.Default().Panic(err)
+               log.Fatal(err)
+               os.Exit(1)
        }
 }
index 1749b99e17cb1be5c762e39c8890c1f9d5434065..d4eac3b226d1f8d5d861bf12f471fbe2a523dcd9 100644 (file)
@@ -60,6 +60,7 @@ type clientImpl struct {
 }
 
 type getReader struct {
+       first  *ephs_pb.GetResponse
        stream ephs_pb.Ephs_GetClient
        buf    *bytes.Buffer
        cancel context.CancelFunc
@@ -80,6 +81,34 @@ func IsEphsPath(p string) bool {
        return strings.HasPrefix(p, KeyPrefix)
 }
 
+type notFoundError struct {
+       ephsPath string
+}
+
+func (e *notFoundError) Error() string {
+       return fmt.Sprintf("path was not found in ephs: %s", e.ephsPath)
+}
+
+var _ error = &notFoundError{}
+
+func IsNotFound(err error) bool {
+       if err == nil {
+               return false
+       }
+       _, ok := err.(*notFoundError)
+       return ok
+}
+
+func maybeMakeNotFound(err error) error {
+       if st, ok := status.FromError(err); ok {
+               if st.Code() == codes.NotFound {
+                       return &notFoundError{st.Message()}
+               }
+       }
+
+       return err
+}
+
 func FormatFsEntry(e *ephs_pb.FsEntry) string {
        if e == nil {
                return "<nil>"
@@ -194,14 +223,29 @@ func (c *clientImpl) GetContext(ctx context.Context, path string) (io.ReadCloser
                return nil, err
        }
 
-       return &getReader{stream, &bytes.Buffer{}, nil}, nil
+       // peek the first message.
+       // this is where any errors returned from the rpc function will be raised, so we
+       // need to catch this before we initialize the reader stream.
+       msg, err := stream.Recv()
+       if err != nil {
+               return nil, maybeMakeNotFound(err)
+       }
+       return &getReader{msg, stream, &bytes.Buffer{}, nil}, nil
 }
 
 func (r *getReader) Read(p []byte) (int, error) {
        if r.buf.Len() < 1 {
-               msg, err := r.stream.Recv()
-               if err != nil {
-                       return 0, err
+               var msg *ephs_pb.GetResponse
+
+               if f := r.first; f != nil {
+                       msg = f
+                       r.first = nil
+               } else {
+                       m, err := r.stream.Recv()
+                       if err != nil {
+                               return 0, err
+                       }
+                       msg = m
                }
 
                if len(msg.Chunk) == 0 {
@@ -239,7 +283,7 @@ func (c *clientImpl) StatContext(ctx context.Context, path string) (*ephs_pb.FsE
        }
        resp, err := rpc.Stat(ctx, req)
        if err != nil {
-               return nil, err
+               return nil, maybeMakeNotFound(err)
        }
 
        return resp.GetEntry(), nil
@@ -263,7 +307,7 @@ func (c *clientImpl) DeleteContext(ctx context.Context, path string, recursive b
                Recursive: recursive,
        }
        _, err = rpc.Delete(ctx, req)
-       return err
+       return maybeMakeNotFound(err)
 }
 
 func (c *clientImpl) MkDir(path string, recursive bool) error {
@@ -284,7 +328,7 @@ func (c *clientImpl) MkDirContext(ctx context.Context, path string, recursive bo
                Recursive: recursive,
        }
        _, err = rpc.MkDir(ctx, req)
-       return err
+       return maybeMakeNotFound(err)
 }
 
 func (c *clientImpl) Put(path string, size uint64, r io.Reader) (*ephs_pb.FsEntry, error) {
@@ -323,7 +367,7 @@ func (c *clientImpl) PutContext(ctx context.Context, path string, size uint64, r
                req.Chunk = req.Chunk[:n]
                err = stream.Send(req)
                if err != nil {
-                       return nil, err
+                       return nil, maybeMakeNotFound(err)
                }
                nw += uint64(n)
                if err == io.EOF {
@@ -354,7 +398,7 @@ func (c *clientImpl) Watch(ctx context.Context, path string) (<-chan *ephs_pb.Wa
        }
        stream, err := rpc.Watch(ctx, req)
        if err != nil {
-               return nil, err
+               return nil, maybeMakeNotFound(err)
        }
 
        ch := make(chan *ephs_pb.WatchResponse)
index caebdd3534c9f646df4342593bd49ce2f36f1810..32ce492291d4adc6a8474cfbe62aa55dcb7882a4 100644 (file)
@@ -7,6 +7,7 @@ import (
        "github.com/minio/minio-go/v7"
        "github.com/minio/minio-go/v7/pkg/credentials"
        "go.fuhry.dev/runtime/constants"
+       "go.fuhry.dev/runtime/utils/option"
 )
 
 var s3Endpoint = "s3." + constants.WebServicesDomain
@@ -14,21 +15,28 @@ var s3Bucket = "ephs"
 var s3Prefix = ""
 
 func WithAWSEnvCredentials() Option {
-       return &genericOption{
-               apply: func(s *ephsServicer) error {
-                       s.s3Creds = credentials.NewEnvAWS()
-                       return nil
-               },
-       }
+       return option.NewOption(func(s *ephsServicer) error {
+               s.s3Creds = credentials.NewEnvAWS()
+
+               cc := &credentials.CredContext{
+                       Endpoint: s3Endpoint,
+               }
+               _, err := s.s3Creds.GetWithContext(cc)
+
+               return err
+       })
 }
 
 func WithAWSCredentialFile(filename string) Option {
-       return &genericOption{
-               apply: func(s *ephsServicer) error {
-                       s.s3Creds = credentials.NewFileAWSCredentials(filename, "default")
-                       return nil
-               },
-       }
+       return option.NewOption(func(s *ephsServicer) error {
+               s.s3Creds = credentials.NewFileAWSCredentials(filename, "default")
+
+               cc := &credentials.CredContext{
+                       Endpoint: s3Endpoint,
+               }
+               _, err := s.s3Creds.GetWithContext(cc)
+               return err
+       })
 }
 
 func (s *ephsServicer) newS3Client() (*minio.Client, error) {
index 541f6f27b305064025dde639c42ba0679e6eb0f2..982504fbefcee1458550e2ddb3dd67032f848cda 100644 (file)
@@ -209,9 +209,8 @@ func (s *ephsServicer) getPath(ctx context.Context, path string) (*ephs_proto.Fs
        }
 
        if len(obj.Kvs) < 1 {
-               return nil, status.Errorf(
+               return nil, status.Error(
                        codes.NotFound,
-                       "object does not exist: %q",
                        path)
        }