From: Dan Fuhry Date: Fri, 21 Nov 2025 00:34:00 +0000 (-0500) Subject: [ephs] `edit` command, client library and server improvements X-Git-Url: https://go.fuhry.dev/?a=commitdiff_plain;h=a6385efb94eb7846afed0cd9c9d5f33c1f5077f4;p=runtime.git [ephs] `edit` command, client library and server improvements - fixed server failing to build since 020d9e6d because s3.go referenced the removed `genericOption` type - add custom error type for not-found errors so they are easier to distinguish - validate S3 credentials on server startup - add `edit` command to ephs commandline client to allow editing text files directly in ephs --- diff --git a/cmd/ephs_client/BUILD.bazel b/cmd/ephs_client/BUILD.bazel index ee40c32..906286d 100644 --- a/cmd/ephs_client/BUILD.bazel +++ b/cmd/ephs_client/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//constants", "//ephs", "//utils/context", + "//utils/fsutil", "//utils/log", "@com_github_urfave_cli_v3//:cli", ], diff --git a/cmd/ephs_client/main.go b/cmd/ephs_client/main.go index b0b113d..da5aeaa 100644 --- a/cmd/ephs_client/main.go +++ b/cmd/ephs_client/main.go @@ -1,19 +1,23 @@ package main import ( + "bytes" "errors" "flag" "fmt" "io" "os" + "os/exec" "path" "strings" + "unicode/utf8" "github.com/urfave/cli/v3" "go.fuhry.dev/runtime/constants" "go.fuhry.dev/runtime/ephs" "go.fuhry.dev/runtime/utils/context" + "go.fuhry.dev/runtime/utils/fsutil" "go.fuhry.dev/runtime/utils/log" ) @@ -154,6 +158,103 @@ func cmdMkdir(ctx context.Context, cmd *cli.Command) error { return nil } +func cmdEdit(ctx context.Context, cmd *cli.Command) error { + client, err := ephs.DefaultClient() + if err != nil { + return err + } + + buf := bytes.NewBuffer(nil) + temp, err := os.CreateTemp(os.TempDir(), "ephs*") + if err != nil { + return err + } + tempFileName := temp.Name() + defer os.Remove(tempFileName) + + reader, err := client.GetContext(ctx, cmd.StringArg("path")) + if err == nil { + tee := io.TeeReader(reader, buf) + + io.Copy(temp, tee) + temp.Close() + + if !utf8.Valid(buf.Bytes()) { + return errors.New("cannot edit a binary file") + } + } else if ephs.IsNotFound(err) { + temp.Close() + log.Default().Infof("path %q not found, creating new file", cmd.StringArg("path")) + } else { + temp.Close() + return err + } + + editor := os.Getenv("EDITOR") + if editor == "" { + for _, f := range []string{"/usr/bin/vim", "/usr/bin/vi", "/usr/bin/nano"} { + if err := fsutil.FileExistsAndIsReadable(f); err == nil { + editor = f + break + } + } + } + if editor == "" { + return errors.New("EDITOR not set and cannot find any suitable fallback") + } + if !path.IsAbs(editor) { + editor, err = exec.LookPath(editor) + if err != nil { + return err + } + } + + editorProc, err := os.StartProcess( + editor, + []string{ + editor, + tempFileName, + }, + &os.ProcAttr{ + Files: []*os.File{ + os.Stdin, + os.Stdout, + os.Stderr, + }, + }, + ) + if err != nil { + return err + } + editorState, err := editorProc.Wait() + if err != nil { + return err + } + if status := editorState.ExitCode(); status != 0 { + return fmt.Errorf("editor %s exited with status %d", editor, status) + } + + newContents, err := os.ReadFile(tempFileName) + if err != nil { + return err + } + + if bytes.Equal(newContents, buf.Bytes()) { + log.Default().Info("file unchanged, not reuploading to ephs") + return nil + } + + newBuf := bytes.NewBuffer(newContents) + putResult, err := client.PutContext(ctx, cmd.StringArg("path"), uint64(len(newContents)), newBuf) + if err != nil { + return err + } + + fmt.Println(ephs.FormatFsEntry(putResult)) + + return nil +} + func main() { ctx, cancel := context.Interruptible() defer cancel() @@ -249,6 +350,17 @@ func main() { }, }, }, + { + Name: "edit", + Description: "edit a text file", + Action: cmdEdit, + Arguments: []cli.Argument{ + &cli.StringArg{ + Name: "path", + UsageText: "path to edit", + }, + }, + }, }, Flags: []cli.Flag{ @@ -268,6 +380,7 @@ func main() { } if err := cmd.Run(ctx, os.Args); err != nil { - log.Default().Panic(err) + log.Fatal(err) + os.Exit(1) } } diff --git a/ephs/client.go b/ephs/client.go index 1749b99..d4eac3b 100644 --- a/ephs/client.go +++ b/ephs/client.go @@ -60,6 +60,7 @@ type clientImpl struct { } type getReader struct { + first *ephs_pb.GetResponse stream ephs_pb.Ephs_GetClient buf *bytes.Buffer cancel context.CancelFunc @@ -80,6 +81,34 @@ func IsEphsPath(p string) bool { return strings.HasPrefix(p, KeyPrefix) } +type notFoundError struct { + ephsPath string +} + +func (e *notFoundError) Error() string { + return fmt.Sprintf("path was not found in ephs: %s", e.ephsPath) +} + +var _ error = ¬FoundError{} + +func IsNotFound(err error) bool { + if err == nil { + return false + } + _, ok := err.(*notFoundError) + return ok +} + +func maybeMakeNotFound(err error) error { + if st, ok := status.FromError(err); ok { + if st.Code() == codes.NotFound { + return ¬FoundError{st.Message()} + } + } + + return err +} + func FormatFsEntry(e *ephs_pb.FsEntry) string { if e == nil { return "" @@ -194,14 +223,29 @@ func (c *clientImpl) GetContext(ctx context.Context, path string) (io.ReadCloser return nil, err } - return &getReader{stream, &bytes.Buffer{}, nil}, nil + // peek the first message. + // this is where any errors returned from the rpc function will be raised, so we + // need to catch this before we initialize the reader stream. + msg, err := stream.Recv() + if err != nil { + return nil, maybeMakeNotFound(err) + } + return &getReader{msg, stream, &bytes.Buffer{}, nil}, nil } func (r *getReader) Read(p []byte) (int, error) { if r.buf.Len() < 1 { - msg, err := r.stream.Recv() - if err != nil { - return 0, err + var msg *ephs_pb.GetResponse + + if f := r.first; f != nil { + msg = f + r.first = nil + } else { + m, err := r.stream.Recv() + if err != nil { + return 0, err + } + msg = m } if len(msg.Chunk) == 0 { @@ -239,7 +283,7 @@ func (c *clientImpl) StatContext(ctx context.Context, path string) (*ephs_pb.FsE } resp, err := rpc.Stat(ctx, req) if err != nil { - return nil, err + return nil, maybeMakeNotFound(err) } return resp.GetEntry(), nil @@ -263,7 +307,7 @@ func (c *clientImpl) DeleteContext(ctx context.Context, path string, recursive b Recursive: recursive, } _, err = rpc.Delete(ctx, req) - return err + return maybeMakeNotFound(err) } func (c *clientImpl) MkDir(path string, recursive bool) error { @@ -284,7 +328,7 @@ func (c *clientImpl) MkDirContext(ctx context.Context, path string, recursive bo Recursive: recursive, } _, err = rpc.MkDir(ctx, req) - return err + return maybeMakeNotFound(err) } func (c *clientImpl) Put(path string, size uint64, r io.Reader) (*ephs_pb.FsEntry, error) { @@ -323,7 +367,7 @@ func (c *clientImpl) PutContext(ctx context.Context, path string, size uint64, r req.Chunk = req.Chunk[:n] err = stream.Send(req) if err != nil { - return nil, err + return nil, maybeMakeNotFound(err) } nw += uint64(n) if err == io.EOF { @@ -354,7 +398,7 @@ func (c *clientImpl) Watch(ctx context.Context, path string) (<-chan *ephs_pb.Wa } stream, err := rpc.Watch(ctx, req) if err != nil { - return nil, err + return nil, maybeMakeNotFound(err) } ch := make(chan *ephs_pb.WatchResponse) diff --git a/ephs/servicer/s3.go b/ephs/servicer/s3.go index caebdd3..32ce492 100644 --- a/ephs/servicer/s3.go +++ b/ephs/servicer/s3.go @@ -7,6 +7,7 @@ import ( "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" "go.fuhry.dev/runtime/constants" + "go.fuhry.dev/runtime/utils/option" ) var s3Endpoint = "s3." + constants.WebServicesDomain @@ -14,21 +15,28 @@ var s3Bucket = "ephs" var s3Prefix = "" func WithAWSEnvCredentials() Option { - return &genericOption{ - apply: func(s *ephsServicer) error { - s.s3Creds = credentials.NewEnvAWS() - return nil - }, - } + return option.NewOption(func(s *ephsServicer) error { + s.s3Creds = credentials.NewEnvAWS() + + cc := &credentials.CredContext{ + Endpoint: s3Endpoint, + } + _, err := s.s3Creds.GetWithContext(cc) + + return err + }) } func WithAWSCredentialFile(filename string) Option { - return &genericOption{ - apply: func(s *ephsServicer) error { - s.s3Creds = credentials.NewFileAWSCredentials(filename, "default") - return nil - }, - } + return option.NewOption(func(s *ephsServicer) error { + s.s3Creds = credentials.NewFileAWSCredentials(filename, "default") + + cc := &credentials.CredContext{ + Endpoint: s3Endpoint, + } + _, err := s.s3Creds.GetWithContext(cc) + return err + }) } func (s *ephsServicer) newS3Client() (*minio.Client, error) { diff --git a/ephs/servicer/servicer.go b/ephs/servicer/servicer.go index 541f6f2..982504f 100644 --- a/ephs/servicer/servicer.go +++ b/ephs/servicer/servicer.go @@ -209,9 +209,8 @@ func (s *ephsServicer) getPath(ctx context.Context, path string) (*ephs_proto.Fs } if len(obj.Kvs) < 1 { - return nil, status.Errorf( + return nil, status.Error( codes.NotFound, - "object does not exist: %q", path) }