]> go.fuhry.dev Git - runtime.git/commitdiff
Add ephs ("ephemeral filesystem")
authorDan Fuhry <dan@fuhry.com>
Thu, 6 Nov 2025 11:53:00 +0000 (06:53 -0500)
committerDan Fuhry <dan@fuhry.com>
Sun, 9 Nov 2025 12:24:03 +0000 (07:24 -0500)
18 files changed:
config_watcher/backend_ephs.go [new file with mode: 0644]
config_watcher/backend_local.go [new file with mode: 0644]
config_watcher/watcher.go [new file with mode: 0644]
ephs/client.go [new file with mode: 0644]
ephs/servicer/acl.go [new file with mode: 0644]
ephs/servicer/acl_test.go [new file with mode: 0644]
ephs/servicer/fs_object.go [new file with mode: 0644]
ephs/servicer/s3.go [new file with mode: 0644]
ephs/servicer/servicer.go [new file with mode: 0644]
ephs/servicer/writer.go [new file with mode: 0644]
proto/service/ephs/Makefile [new file with mode: 0644]
proto/service/ephs/service.pb.go [new file with mode: 0644]
proto/service/ephs/service.proto [new file with mode: 0644]
proto/service/ephs/service_grpc.pb.go [new file with mode: 0644]
proto/service/ephs/types.pb.go [new file with mode: 0644]
proto/service/ephs/types.proto [new file with mode: 0644]
utils/stringmatch/serialization2.go [new file with mode: 0644]
utils/stringmatch/serialization2_test.go [new file with mode: 0644]

diff --git a/config_watcher/backend_ephs.go b/config_watcher/backend_ephs.go
new file mode 100644 (file)
index 0000000..7d9d088
--- /dev/null
@@ -0,0 +1,65 @@
+package config_watcher
+
+import (
+       "context"
+       "io"
+
+       "go.fuhry.dev/runtime/ephs"
+)
+
+type ephsConfigWatcher struct {
+       f      string
+       client ephs.Client
+       ctx    context.Context
+       cancel context.CancelFunc
+       ch     <-chan *ephs.WatchResponse
+}
+
+var _ ConfigWatcher = &ephsConfigWatcher{}
+
+func (w *ephsConfigWatcher) Next() (*Update, error) {
+       select {
+       case <-w.ch:
+               reader, err := w.client.Get(w.f)
+               if err != nil {
+                       return nil, err
+               }
+
+               contents, err := io.ReadAll(reader)
+               if err != nil {
+                       return nil, err
+               }
+
+               return &Update{
+                       Contents: contents,
+               }, nil
+       case <-w.ctx.Done():
+               return nil, w.ctx.Err()
+       }
+}
+
+func (w *ephsConfigWatcher) Close() error {
+       w.cancel()
+       return nil
+}
+
+func newEphsConfigWatcher(ctx context.Context, filePath string) (*ephsConfigWatcher, error) {
+       client, err := ephs.DefaultClient()
+       if err != nil {
+               return nil, err
+       }
+
+       ctx, cancel := context.WithCancel(ctx)
+
+       watchCh, err := client.Watch(ctx, filePath)
+
+       w := &ephsConfigWatcher{
+               f:      filePath,
+               client: client,
+               ctx:    ctx,
+               cancel: cancel,
+               ch:     watchCh,
+       }
+
+       return w, nil
+}
diff --git a/config_watcher/backend_local.go b/config_watcher/backend_local.go
new file mode 100644 (file)
index 0000000..5449f97
--- /dev/null
@@ -0,0 +1,94 @@
+package config_watcher
+
+import (
+       "fmt"
+       "io"
+       "os"
+       "sync"
+
+       "go.fuhry.dev/runtime/mtls/fsnotify"
+       "go.fuhry.dev/runtime/utils/log"
+)
+
+type localConfigWatcher struct {
+       f       string
+       written bool
+       mu      sync.Mutex
+       ch      chan bool
+       log     log.Logger
+}
+
+func (w *localConfigWatcher) Next() (*Update, error) {
+       if v := <-w.ch; !v {
+               return nil, io.EOF
+       }
+
+       contents, err := os.ReadFile(w.f)
+       if err != nil {
+               return nil, err
+       }
+       return &Update{
+               Contents: contents,
+       }, nil
+}
+
+func (w *localConfigWatcher) Close() error {
+       w.mu.Lock()
+       defer w.mu.Unlock()
+
+       if err := fsnotify.Unsubscribe(w.f); err != nil {
+               return err
+       }
+
+       close(w.ch)
+       w.ch = nil
+       return nil
+}
+
+func (w *localConfigWatcher) notif(filePath string, op fsnotify.Op) {
+       w.mu.Lock()
+       defer w.mu.Unlock()
+
+       w.log.V(1).Debugf("event(path=%s, op=%v)", filePath, op)
+
+       if w.ch == nil {
+               return
+       }
+
+       if filePath != w.f {
+               return
+       }
+
+       if op.Has(fsnotify.Write) {
+               w.written = true
+       }
+       if w.written && op.Has(fsnotify.Close) {
+               w.written = false
+               w.ch <- true
+       }
+}
+
+var _ ConfigWatcher = &localConfigWatcher{}
+
+func newLocalConfigWatcher(filePath string) (*localConfigWatcher, error) {
+       filePath = fsnotify.RealPath(filePath)
+
+       w := &localConfigWatcher{
+               f:   filePath,
+               ch:  make(chan bool),
+               log: log.Default().WithPrefix(fmt.Sprintf("localConfigWatcher(%s)", filePath)),
+       }
+
+       err := fsnotify.NotifyPath(filePath, w.notif)
+       if err != nil {
+               close(w.ch)
+               return nil, err
+       }
+
+       // send first update immediately
+       go (func() {
+               w.ch <- true
+       })()
+
+       return w, nil
+}
diff --git a/config_watcher/watcher.go b/config_watcher/watcher.go
new file mode 100644 (file)
index 0000000..d719738
--- /dev/null
@@ -0,0 +1,24 @@
+package config_watcher
+
+import (
+       "context"
+
+       "go.fuhry.dev/runtime/ephs"
+)
+
+type ConfigWatcher interface {
+       Next() (*Update, error)
+       Close() error
+}
+
+type Update struct {
+       Contents []byte
+}
+
+func Watch(ctx context.Context, filePath string) (ConfigWatcher, error) {
+       if ephs.IsEphsPath(filePath) {
+               return newEphsConfigWatcher(ctx, filePath)
+       }
+
+       return newLocalConfigWatcher(filePath)
+}
diff --git a/ephs/client.go b/ephs/client.go
new file mode 100644 (file)
index 0000000..baba8bf
--- /dev/null
@@ -0,0 +1,467 @@
+package ephs
+
+import (
+       "bytes"
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "math"
+       "strings"
+       "sync"
+       "time"
+
+       "github.com/quic-go/quic-go"
+       "go.fuhry.dev/runtime/grpc"
+       "go.fuhry.dev/runtime/mtls"
+       ephs_pb "go.fuhry.dev/runtime/proto/service/ephs"
+       "go.fuhry.dev/runtime/utils/log"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+)
+
+const KeyPrefix = "/ephs/"
+const ChunkSize = 262144
+
+const formatEntryDateFormat = "Monday, 2 Jan 2006 15:04:05 -0700"
+
+type WatchResponse = ephs_pb.WatchResponse
+
+type Client interface {
+       Get(path string) (io.Reader, error)
+       GetContext(ctx context.Context, path string) (io.ReadCloser, error)
+
+       Stat(path string) (*ephs_pb.FsEntry, error)
+       StatContext(ctx context.Context, path string) (*ephs_pb.FsEntry, error)
+
+       Put(path string, size uint64, content io.Reader) (*ephs_pb.FsEntry, error)
+       PutContext(ctx context.Context, path string, size uint64, content io.Reader) (*ephs_pb.FsEntry, error)
+
+       Delete(path string, recursive bool) error
+       DeleteContext(ctx context.Context, path string, recursive bool) error
+
+       MkDir(path string, recursive bool) error
+       MkDirContext(ctx context.Context, path string, recursive bool) error
+
+       Watch(ctx context.Context, path string) (<-chan *ephs_pb.WatchResponse, error)
+}
+
+type ClientOption interface {
+       Apply(*clientImpl) error
+}
+
+type genericClientOption struct {
+       apply func(*clientImpl) error
+}
+
+type clientImpl struct {
+       defaultCtx     context.Context
+       defaultTimeout time.Duration
+       id             mtls.Identity
+
+       client grpc.Client
+       conn   *grpc.ClientConn
+}
+
+type getReader struct {
+       stream ephs_pb.Ephs_GetClient
+       buf    *bytes.Buffer
+       cancel context.CancelFunc
+}
+
+var DefaultClientContext context.Context
+
+var ephsQuicConfig = &quic.Config{
+       HandshakeIdleTimeout: 5 * time.Second,
+       MaxIdleTimeout:       120 * time.Minute,
+       KeepAlivePeriod:      5 * time.Second,
+}
+
+var defaultClient Client
+var defaultClientMu sync.Mutex
+
+func IsEphsPath(p string) bool {
+       return strings.HasPrefix(p, KeyPrefix)
+}
+
+func FormatFsEntry(e *ephs_pb.FsEntry) string {
+       if e == nil {
+               return "<nil>"
+       }
+
+       var out string
+
+       out += fmt.Sprintf("Owner:           %s\n", e.Owner)
+       out += fmt.Sprintf("Version:         %d\n", e.Version)
+       out += fmt.Sprintf("Created:         %s\n", e.Created.AsTime().Format(formatEntryDateFormat))
+       out += fmt.Sprintf("Modified:        %s\n", e.Modified.AsTime().Format(formatEntryDateFormat))
+
+       if e.Content.GetDirectory() != nil {
+               children := len(e.Content.GetDirectory().GetEntries())
+               out += fmt.Sprintf("Children:        %d\n", children)
+               if children > 0 {
+                       out += strings.Repeat("-", 70) + "\nEntries:\n"
+                       for _, child := range e.Content.GetDirectory().GetEntries() {
+                               dirMarker := "    "
+                               if child.Directory {
+                                       dirMarker = "[d] "
+                               }
+
+                               out += dirMarker + child.Name
+                               if child.Directory {
+                                       out += "/"
+                               }
+                               out += "\n"
+                       }
+               }
+       } else {
+               out += fmt.Sprintf("Size:            %d bytes (%s)\n", e.Size, humanFilesize(e.Size))
+       }
+
+       if f := e.Content.GetFile(); f != nil {
+               out += "Storage:         inline\n"
+               out += fmt.Sprintf("Compression:     %s\n", f.Compression.String())
+               out += fmt.Sprintf("Compressed size: %d bytes (%s)\n", len(f.Content), humanFilesize(uint64(len(f.Content))))
+       } else if f := e.Content.GetLargeFile(); f != nil {
+               out += "Storage:         S3\n"
+               out += fmt.Sprintf("Key:             %s\n", f.Key)
+       }
+
+       return out
+}
+
+func (o *genericClientOption) Apply(c *clientImpl) error {
+       return o.apply(c)
+}
+
+func WithDefaultTimeout(d time.Duration) ClientOption {
+       return &genericClientOption{
+               apply: func(c *clientImpl) error {
+                       c.defaultTimeout = d
+                       return nil
+               },
+       }
+}
+
+func DefaultClient() (Client, error) {
+       defaultClientMu.Lock()
+       defer defaultClientMu.Unlock()
+
+       if defaultClient != nil {
+               return defaultClient, nil
+       }
+
+       if DefaultClientContext == nil {
+               DefaultClientContext = context.Background()
+       }
+
+       client, err := NewClient(DefaultClientContext, mtls.DefaultIdentity())
+       if err != nil {
+               return nil, err
+       }
+
+       defaultClient = client
+       return defaultClient, nil
+}
+
+func NewClient(ctx context.Context, localId mtls.Identity, opts ...ClientOption) (Client, error) {
+       cl := &clientImpl{
+               defaultCtx:     ctx,
+               defaultTimeout: 15 * time.Second,
+               id:             localId,
+       }
+
+       for _, opt := range opts {
+               if err := opt.Apply(cl); err != nil {
+                       return nil, err
+               }
+       }
+
+       return cl, nil
+}
+
+func (c *clientImpl) Get(path string) (io.Reader, error) {
+       ctx, cancel := context.WithTimeout(c.defaultCtx, c.defaultTimeout)
+
+       reader, err := c.GetContext(ctx, path)
+       if err != nil {
+               return nil, err
+       }
+
+       reader.(*getReader).cancel = cancel
+
+       return reader, nil
+}
+
+func (c *clientImpl) GetContext(ctx context.Context, path string) (io.ReadCloser, error) {
+       rpc, err := c.grpcClient()
+       if err != nil {
+               return nil, err
+       }
+       req := &ephs_pb.GetRequest{Path: path}
+       stream, err := rpc.Get(ctx, req)
+       if err != nil {
+               return nil, err
+       }
+
+       return &getReader{stream, &bytes.Buffer{}, nil}, nil
+}
+
+func (r *getReader) Read(p []byte) (int, error) {
+       if r.buf.Len() < 1 {
+               msg, err := r.stream.Recv()
+               if err != nil {
+                       return 0, err
+               }
+
+               if len(msg.Chunk) == 0 {
+                       return 0, io.EOF
+               }
+
+               r.buf.Write(msg.Chunk)
+       }
+
+       return r.buf.Read(p)
+}
+
+func (r *getReader) Close() error {
+       if r.cancel != nil {
+               r.cancel()
+       }
+       return r.stream.CloseSend()
+}
+
+func (c *clientImpl) Stat(path string) (*ephs_pb.FsEntry, error) {
+       ctx, cancel := context.WithTimeout(c.defaultCtx, c.defaultTimeout)
+       defer cancel()
+
+       return c.StatContext(ctx, path)
+}
+
+func (c *clientImpl) StatContext(ctx context.Context, path string) (*ephs_pb.FsEntry, error) {
+       rpc, err := c.grpcClient()
+       if err != nil {
+               return nil, err
+       }
+
+       req := &ephs_pb.GetRequest{
+               Path: path,
+       }
+       resp, err := rpc.Stat(ctx, req)
+       if err != nil {
+               return nil, err
+       }
+
+       return resp.GetEntry(), nil
+}
+
+func (c *clientImpl) Delete(path string, recursive bool) error {
+       ctx, cancel := context.WithTimeout(c.defaultCtx, c.defaultTimeout)
+       defer cancel()
+
+       return c.DeleteContext(ctx, path, recursive)
+}
+
+func (c *clientImpl) DeleteContext(ctx context.Context, path string, recursive bool) error {
+       rpc, err := c.grpcClient()
+       if err != nil {
+               return err
+       }
+
+       req := &ephs_pb.DeleteRequest{
+               Path:      path,
+               Recursive: recursive,
+       }
+       _, err = rpc.Delete(ctx, req)
+       return err
+}
+
+func (c *clientImpl) MkDir(path string, recursive bool) error {
+       ctx, cancel := context.WithTimeout(c.defaultCtx, c.defaultTimeout)
+       defer cancel()
+
+       return c.MkDirContext(ctx, path, recursive)
+}
+
+func (c *clientImpl) MkDirContext(ctx context.Context, path string, recursive bool) error {
+       rpc, err := c.grpcClient()
+       if err != nil {
+               return err
+       }
+
+       req := &ephs_pb.MkDirRequest{
+               Path:      path,
+               Recursive: recursive,
+       }
+       _, err = rpc.MkDir(ctx, req)
+       return err
+}
+
+func (c *clientImpl) Put(path string, size uint64, r io.Reader) (*ephs_pb.FsEntry, error) {
+       ctx, cancel := context.WithTimeout(c.defaultCtx, c.defaultTimeout)
+       defer cancel()
+
+       return c.PutContext(ctx, path, size, r)
+}
+
+func (c *clientImpl) PutContext(ctx context.Context, path string, size uint64, r io.Reader) (*ephs_pb.FsEntry, error) {
+       rpc, err := c.grpcClient()
+       if err != nil {
+               return nil, err
+       }
+
+       req := &ephs_pb.PutRequest{
+               Path:    path,
+               Size:    size,
+               Version: 0,
+               Chunk:   make([]byte, min(size, ChunkSize)),
+       }
+       stream, err := rpc.Put(ctx)
+       if err != nil {
+               return nil, err
+       }
+
+       for nw := uint64(0); nw < size; {
+               n, err := r.Read(req.Chunk)
+               if n == 0 || err == io.EOF {
+                       break
+               }
+               if err != nil {
+                       stream.CloseSend()
+                       return nil, err
+               }
+               req.Chunk = req.Chunk[:n]
+               err = stream.Send(req)
+               if err != nil {
+                       return nil, err
+               }
+               nw += uint64(n)
+               if err == io.EOF {
+                       break
+               }
+       }
+
+       response, err := stream.CloseAndRecv()
+       if err != nil && err != io.EOF {
+               return nil, err
+       }
+
+       if response != nil && response.GetEntry() != nil {
+               return response.GetEntry(), nil
+       } else {
+               return nil, errors.New("upload succeeded but received nil response")
+       }
+}
+
+func (c *clientImpl) Watch(ctx context.Context, path string) (<-chan *ephs_pb.WatchResponse, error) {
+       rpc, err := c.grpcClient()
+       if err != nil {
+               return nil, err
+       }
+
+       req := &ephs_pb.GetRequest{
+               Path: path,
+       }
+       stream, err := rpc.Watch(ctx, req)
+       if err != nil {
+               return nil, err
+       }
+
+       ch := make(chan *ephs_pb.WatchResponse)
+
+       go c.watch(ctx, path, stream, ch)
+
+       return ch, nil
+}
+
+func (c *clientImpl) watch(origCtx context.Context, origPath string, stream ephs_pb.Ephs_WatchClient, ch chan *ephs_pb.WatchResponse) {
+       defer close(ch)
+
+       for {
+               msg, err := stream.Recv()
+               if err != nil {
+                       if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
+                               stream.CloseSend()
+
+                               if rpc, err := c.grpcClient(); err == nil {
+                                       stream, err = rpc.Watch(origCtx, &ephs_pb.GetRequest{Path: origPath})
+                                       if err == nil {
+                                               log.Default().Noticef("reconnected watcher with new stream")
+                                               continue
+                                       } else {
+                                               log.Default().Errorf("error reestablishing watch stream: %v", err)
+                                       }
+                               } else {
+                                       log.Default().Errorf("error reconnecting: %v", err)
+                               }
+                       }
+                       log.Default().Errorf("error receiving watch stream: %v", err)
+                       return
+               }
+
+               timer := time.NewTimer(c.defaultTimeout)
+               select {
+               case ch <- msg:
+                       // do nothing - we've written to the channel and that's all we require
+               case <-timer.C:
+                       return
+               }
+       }
+}
+
+func (c *clientImpl) grpcClient() (ephs_pb.EphsClient, error) {
+       var err error
+       serverId := mtls.NewServiceIdentity("ephs")
+
+       if c.client == nil {
+               c.client, err = grpc.NewGrpcClient(c.defaultCtx, serverId, c.id,
+                       grpc.WithConnectionFactory(&grpc.QUICConnectionFactory{
+                               QUICConfig: ephsQuicConfig.Clone(),
+                       }))
+               if err != nil {
+                       return nil, fmt.Errorf("error creating grpc client: %T: %v", err, err)
+               }
+       }
+
+       deadline, ok := c.defaultCtx.Deadline()
+       if !ok {
+               deadline = time.Now().Add(60 * time.Second)
+       }
+       var (
+               conn    *grpc.ClientConn
+               lastErr error
+       )
+
+       for time.Now().Before(deadline) {
+               conn, lastErr = c.client.Conn()
+               if lastErr == nil {
+                       break
+               }
+               log.Default().Warningf("error establishing grpc connection to ephs server, retrying in 1s: %T: %v", lastErr, lastErr)
+               select {
+               case <-c.defaultCtx.Done():
+                       log.Default().Error(c.defaultCtx.Err())
+                       return nil, c.defaultCtx.Err()
+               case <-time.NewTimer(1 * time.Second).C:
+               }
+       }
+       if lastErr != nil {
+               return nil, lastErr
+       }
+
+       confFsCl := ephs_pb.NewEphsClient(conn)
+       return confFsCl, nil
+}
+
+func humanFilesize(s uint64) string {
+       if s > uint64(0.9*math.Pow(2, 40)) {
+               return fmt.Sprintf("%.2f TiB", float64(s)/math.Pow(2, 40))
+       } else if s > uint64(0.9*math.Pow(2, 30)) {
+               return fmt.Sprintf("%.2f GiB", float64(s)/math.Pow(2, 30))
+       } else if s > uint64(0.9*math.Pow(2, 20)) {
+               return fmt.Sprintf("%.2f MiB", float64(s)/math.Pow(2, 20))
+       } else if s > uint64(0.9*math.Pow(2, 10)) {
+               return fmt.Sprintf("%.2f KiB", float64(s)/math.Pow(2, 10))
+       }
+       return fmt.Sprintf("%d bytes", s)
+}
diff --git a/ephs/servicer/acl.go b/ephs/servicer/acl.go
new file mode 100644 (file)
index 0000000..e9cf055
--- /dev/null
@@ -0,0 +1,109 @@
+package servicer
+
+import (
+       "bytes"
+       "errors"
+       "fmt"
+       "os"
+
+       "gopkg.in/yaml.v3"
+
+       "go.fuhry.dev/runtime/utils/stringmatch"
+)
+
+type AclRule struct {
+       Principal *stringmatch.NewSyntaxMatchRule `yaml:"principal"`
+       Key       *stringmatch.NewSyntaxMatchRule `yaml:"key"`
+       Invert    bool                            `yaml:"invert"`
+}
+
+type Acl struct {
+       Rules []*AclRule `yaml:"rules"`
+}
+
+func LoadAcl(filePath string) (*Acl, error) {
+       fp, err := os.OpenFile(filePath, os.O_RDONLY, os.FileMode(0))
+       if err != nil {
+               return nil, err
+       }
+       defer fp.Close()
+
+       acl := &Acl{}
+       decoder := yaml.NewDecoder(fp)
+       if err := decoder.Decode(acl); err != nil {
+               return nil, err
+       }
+
+       if err := acl.Validate(); err != nil {
+               return nil, err
+       }
+
+       return acl, nil
+}
+
+func LoadAclString(text string) (*Acl, error) {
+       buf := bytes.NewBufferString(text)
+       acl := &Acl{}
+       decoder := yaml.NewDecoder(buf)
+       if err := decoder.Decode(acl); err != nil {
+               return nil, err
+       }
+
+       if err := acl.Validate(); err != nil {
+               return nil, err
+       }
+
+       return acl, nil
+}
+
+func (r *AclRule) Match(principal, key string) bool {
+       if !r.Principal.Match(principal) {
+               return false
+       }
+
+       keyMatcher, err := r.Key.Matcher()
+       if err != nil {
+               return false
+       }
+
+       keyMatcher = keyMatcher.Sub(map[string]string{"principal": principal})
+
+       return keyMatcher.Match(key)
+}
+
+func (a *Acl) Check(principal, key string) bool {
+       for _, r := range a.Rules {
+               if r.Match(principal, key) {
+                       return !r.Invert
+               }
+       }
+
+       return false
+}
+
+func (a *Acl) Validate() error {
+       if len(a.Rules) < 1 {
+               return errors.New("access control list is empty")
+       }
+
+       var errs []error
+       for i, rule := range a.Rules {
+               if rule.Key == nil {
+                       errs = append(errs, fmt.Errorf("rule %d missing key", i))
+               } else {
+                       if _, err := rule.Key.Matcher(); err != nil {
+                               errs = append(errs, fmt.Errorf("rule %d key: %v", i, err))
+                       }
+               }
+
+               if rule.Principal == nil {
+                       errs = append(errs, fmt.Errorf("rule %d missing principal", i))
+               } else {
+                       if _, err := rule.Principal.Matcher(); err != nil {
+                               errs = append(errs, fmt.Errorf("rule %d principal: %v", i, err))
+                       }
+               }
+       }
+
+       return errors.Join(errs...)
+}
diff --git a/ephs/servicer/acl_test.go b/ephs/servicer/acl_test.go
new file mode 100644 (file)
index 0000000..d744c7a
--- /dev/null
@@ -0,0 +1,103 @@
+package servicer
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+const testRules = `
+rules:
+  - principal:
+      exact: bob
+    key:
+      exact: /{{principal}}/can/write/this
+  - principal:
+      any: true
+    key:
+      exact: /writable/by/self/{{principal}}
+  - principal:
+      or:
+        - exact: bob
+        - exact: suzie
+    key:
+      prefix: /or/test/{{principal}}/
+`
+
+func TestAcl(t *testing.T) {
+       type testCase struct {
+               description, princ, key string
+               expect                  bool
+       }
+
+       acl, err := LoadAclString(testRules)
+       assert.NoError(t, err)
+
+       testCases := []testCase{
+               {
+                       "bob has access to /bob/can/write/this",
+                       "bob",
+                       "/bob/can/write/this",
+                       true,
+               },
+               {
+                       "bob does not have access to /suzie/can/write/this",
+                       "bob",
+                       "/suzie/can/write/this",
+                       false,
+               },
+               {
+                       "suzie does not have access to /suzie/can/write/this",
+                       "suzie",
+                       "/suzie/can/write/this",
+                       false,
+               },
+               {
+                       "bob has access to /writable/by/self/bob",
+                       "bob",
+                       "/writable/by/self/bob",
+                       true,
+               },
+               {
+                       "suzie has access to /writable/by/self/suzie",
+                       "suzie",
+                       "/writable/by/self/suzie",
+                       true,
+               },
+               {
+                       "bob has access to /writable/by/self/suzie",
+                       "suzie",
+                       "/writable/by/self/suzie",
+                       true,
+               },
+               {
+                       "bob has access to /or/test/bob/foo",
+                       "bob",
+                       "/or/test/bob/foo",
+                       true,
+               },
+               {
+                       "suzie has access to /or/test/suzie/foo",
+                       "suzie",
+                       "/or/test/suzie/foo",
+                       true,
+               },
+               {
+                       "frank does not have access to /or/test/frank/foo",
+                       "frank",
+                       "/or/test/frank/foo",
+                       false,
+               },
+               {
+                       "bob does not have access /no/rule/for/this",
+                       "bob",
+                       "/no/rule/for/this",
+                       false,
+               },
+       }
+
+       for i, tc := range testCases {
+               assert.Equal(t, tc.expect, acl.Check(tc.princ, tc.key),
+                       "test case %d: %s", i, tc.description)
+       }
+}
diff --git a/ephs/servicer/fs_object.go b/ephs/servicer/fs_object.go
new file mode 100644 (file)
index 0000000..94df6c2
--- /dev/null
@@ -0,0 +1,204 @@
+package servicer
+
+import (
+       "bytes"
+       "compress/gzip"
+       "context"
+       "errors"
+       "fmt"
+       "io"
+       "math/rand"
+       "regexp"
+
+       "github.com/minio/minio-go/v7"
+       "go.fuhry.dev/runtime/ephs"
+       ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
+
+       "go.fuhry.dev/runtime/utils/log"
+)
+
+const LargeFileThreshold = ephs.ChunkSize
+
+type FsEntry struct {
+       *ephs_proto.FsEntry
+}
+
+var ErrDirectory = errors.New("is a directory")
+var ErrUnknownCompression = errors.New("file compressed with unknown scheme")
+
+var RegexpLargeFileKey = regexp.MustCompile(`^[0-9A-Za-z_-]{64}$`)
+
+func (e *FsEntry) GetContents(ctx context.Context, s3 *minio.Client) (io.Reader, error) {
+       switch {
+       case e.FsEntry.Content.GetDirectory() != nil:
+               return nil, ErrDirectory
+       case e.FsEntry.Content.GetFile() != nil:
+               return e.readFile()
+       case e.FsEntry.Content.GetLargeFile() != nil:
+               return e.readLargeFile(ctx, s3)
+       default:
+               return nil, errors.New("file content not populated with any known type")
+       }
+}
+
+func (e *FsEntry) SetContents(ctx context.Context, s3 *minio.Client, r io.Reader, size uint64) error {
+       if e.FsEntry.Content.GetDirectory() != nil {
+               return ErrDirectory
+       }
+
+       var oldLargeFile ephs_proto.FsEntry_LargeFile
+       mustCleanupLargeFile := e.FsEntry.Content.GetLargeFile() != nil
+       if mustCleanupLargeFile {
+               oldLargeFile = *e.FsEntry.Content.GetLargeFile()
+       }
+
+       var err error
+       if size >= LargeFileThreshold {
+               err = e.writeLargeFile(ctx, s3, r, size)
+       } else {
+               err = e.writeFile(r, size)
+       }
+       if err != nil {
+               return err
+       }
+
+       if mustCleanupLargeFile {
+               log.Default().Infof("mustCleanupLargeFile: %+v", oldLargeFile)
+
+               err = e.cleanupOldLargeFile(ctx, s3, &oldLargeFile)
+               if err != nil {
+                       log.Default().Alert(
+                               "error while cleaning up old large file with key %q: %v",
+                               oldLargeFile.Key,
+                               err)
+               }
+       }
+
+       return nil
+}
+
+func (e *FsEntry) DeleteContents(ctx context.Context, s3 *minio.Client) error {
+       switch {
+       case e.FsEntry.Content.GetDirectory() != nil:
+               return ErrDirectory
+       case e.FsEntry.Content.GetFile() != nil:
+               e.FsEntry.Content = nil
+               return nil
+       case e.FsEntry.Content.GetLargeFile() != nil:
+               return e.cleanupOldLargeFile(ctx, s3, e.Content.GetLargeFile())
+       default:
+               return errors.New("file content not populated with any known type")
+       }
+}
+
+func (e *FsEntry) readFile() (io.Reader, error) {
+       file := e.FsEntry.Content.GetFile()
+       if file == nil {
+               return nil, errors.New("internal error: readFile() called but file is unpopulated")
+       }
+
+       switch file.Compression {
+       case ephs_proto.FsEntry_File_UNCOMPRESSED:
+               return bytes.NewBuffer(file.GetContent()), nil
+       case ephs_proto.FsEntry_File_GZIP:
+               return gzip.NewReader(bytes.NewBuffer(file.GetContent()))
+       default:
+               return nil, ErrUnknownCompression
+       }
+}
+
+func (e *FsEntry) readLargeFile(ctx context.Context, s3 *minio.Client) (io.Reader, error) {
+       if e.Content == nil || e.Content.GetLargeFile() == nil {
+               return nil, errors.New("not a large file")
+       }
+
+       key := e.Content.GetLargeFile().GetKey()
+       if !RegexpLargeFileKey.MatchString(key) {
+               return nil, errors.New("large file key contains invalid characters or is the wrong length")
+       }
+
+       return s3.GetObject(ctx, s3Bucket, s3Prefix+key, minio.GetObjectOptions{})
+}
+
+func (e *FsEntry) writeFile(r io.Reader, size uint64) error {
+       var compression = ephs_proto.FsEntry_File_GZIP
+
+       if f := e.FsEntry.Content.GetFile(); f != nil {
+               compression = f.Compression
+       }
+
+       f := &ephs_proto.FsEntry_File{
+               Compression: compression,
+       }
+
+       switch compression {
+       case ephs_proto.FsEntry_File_UNCOMPRESSED:
+               var err error
+               f.Content, err = io.ReadAll(r)
+               if err != nil {
+                       return err
+               }
+               if uint64(len(f.Content)) != size {
+                       return fmt.Errorf("underrun writing file: expected %d bytes, wrote %d", size, len(f.Content))
+               }
+       case ephs_proto.FsEntry_File_GZIP:
+               buf := &bytes.Buffer{}
+               gz := gzip.NewWriter(buf)
+               nw, _ := io.Copy(gz, r)
+               gz.Close()
+               f.Content = append(f.Content, buf.Bytes()...)
+
+               if nw != int64(size) {
+                       return fmt.Errorf("underrun writing file: expected %d bytes, wrote %d", size, nw)
+               }
+       }
+
+       e.FsEntry.Content = &ephs_proto.FsEntry_Content{
+               Content: &ephs_proto.FsEntry_Content_File{
+                       File: f,
+               },
+       }
+
+       return nil
+}
+
+func (e *FsEntry) writeLargeFile(ctx context.Context, s3 *minio.Client, r io.Reader, size uint64) error {
+       key := newLargeFileKey()
+
+       log.Default().Noticef("attempting to PutObject into bucket %s at key %s", s3Bucket, key)
+       upload, err := s3.PutObject(ctx, s3Bucket, s3Prefix+key, r, int64(size), minio.PutObjectOptions{})
+       if err != nil {
+               log.Default().Error(err)
+               return err
+       }
+
+       log.Default().Infof("upload info: %+v", upload)
+
+       e.FsEntry.Content = &ephs_proto.FsEntry_Content{
+               Content: &ephs_proto.FsEntry_Content_LargeFile{
+                       LargeFile: &ephs_proto.FsEntry_LargeFile{
+                               Key: key,
+                       },
+               },
+       }
+
+       return nil
+}
+
+func (e *FsEntry) cleanupOldLargeFile(ctx context.Context, s3 *minio.Client, lf *ephs_proto.FsEntry_LargeFile) error {
+       key := lf.GetKey()
+       if !RegexpLargeFileKey.MatchString(key) {
+               return errors.New("large file key contains invalid characters or is the wrong length")
+       }
+
+       return s3.RemoveObject(ctx, s3Bucket, s3Prefix+key, minio.RemoveObjectOptions{})
+}
+
+func newLargeFileKey() string {
+       const charset = `abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-`
+       var out []byte
+       for i := 0; i < 64; i++ {
+               out = append(out, charset[rand.Intn(len(charset))])
+       }
+       return string(out)
+}
diff --git a/ephs/servicer/s3.go b/ephs/servicer/s3.go
new file mode 100644 (file)
index 0000000..caebdd3
--- /dev/null
@@ -0,0 +1,49 @@
+package servicer
+
+import (
+       "errors"
+       "flag"
+
+       "github.com/minio/minio-go/v7"
+       "github.com/minio/minio-go/v7/pkg/credentials"
+       "go.fuhry.dev/runtime/constants"
+)
+
+var s3Endpoint = "s3." + constants.WebServicesDomain
+var s3Bucket = "ephs"
+var s3Prefix = ""
+
+func WithAWSEnvCredentials() Option {
+       return &genericOption{
+               apply: func(s *ephsServicer) error {
+                       s.s3Creds = credentials.NewEnvAWS()
+                       return nil
+               },
+       }
+}
+
+func WithAWSCredentialFile(filename string) Option {
+       return &genericOption{
+               apply: func(s *ephsServicer) error {
+                       s.s3Creds = credentials.NewFileAWSCredentials(filename, "default")
+                       return nil
+               },
+       }
+}
+
+func (s *ephsServicer) newS3Client() (*minio.Client, error) {
+       if !flag.Parsed() {
+               return nil, errors.New("flags not yet parsed")
+       }
+
+       return minio.New(s3Endpoint, &minio.Options{
+               Creds:  s.s3Creds,
+               Secure: true,
+       })
+}
+
+func init() {
+       flag.StringVar(&s3Endpoint, "ephs.s3-endpoint", s3Endpoint, "S3 endpoint")
+       flag.StringVar(&s3Bucket, "ephs.s3-bucket", s3Bucket, "S3 bucket name")
+       flag.StringVar(&s3Prefix, "ephs.s3-prefix", s3Prefix, "S3 object prefix")
+}
diff --git a/ephs/servicer/servicer.go b/ephs/servicer/servicer.go
new file mode 100644 (file)
index 0000000..a95d72a
--- /dev/null
@@ -0,0 +1,709 @@
+package servicer
+
+import (
+       "bytes"
+       "context"
+       "errors"
+       "flag"
+       "fmt"
+       "io"
+       "path"
+       "slices"
+       "strings"
+       "sync"
+
+       "github.com/minio/minio-go/v7"
+       "github.com/minio/minio-go/v7/pkg/credentials"
+       etcd_client "go.etcd.io/etcd/client/v3"
+       etcd_concurrency "go.etcd.io/etcd/client/v3/concurrency"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/peer"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/emptypb"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       "go.fuhry.dev/runtime/ephs"
+       "go.fuhry.dev/runtime/grpc"
+       "go.fuhry.dev/runtime/mtls"
+       "go.fuhry.dev/runtime/mtls/fsnotify"
+       ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
+       "go.fuhry.dev/runtime/sd"
+       "go.fuhry.dev/runtime/utils/hostname"
+       "go.fuhry.dev/runtime/utils/log"
+)
+
+type Option interface {
+       Apply(*ephsServicer) error
+}
+
+type genericOption struct {
+       apply func(*ephsServicer) error
+}
+
+type msgWithPath interface {
+       GetPath() string
+}
+
+type ephsServicer struct {
+       ephs_proto.EphsServer
+
+       acl      *Acl
+       logger   log.Logger
+       clientMu sync.Mutex
+       clients  map[string]*etcd_client.Client
+       s3Creds  *credentials.Credentials
+       s3Client *minio.Client
+}
+
+var cell string
+
+func (o *genericOption) Apply(servicer *ephsServicer) error {
+       return o.apply(servicer)
+}
+
+func WithAcl(acl *Acl) Option {
+       return &genericOption{
+               apply: func(s *ephsServicer) error {
+                       s.acl = acl
+                       return nil
+               },
+       }
+}
+
+func (s *ephsServicer) watchAcl(aclFile string) {
+       handle := func(filePath string, op fsnotify.Op) {
+               if filePath != aclFile {
+                       return
+               }
+               if acl, err := LoadAcl(filePath); err == nil {
+                       s.acl = acl
+                       s.logger.Noticef("reloaded ACLs from %s", filePath)
+               } else {
+                       s.logger.Warningf("error reloading ACLs from %s, rules not reloaded: %v", filePath, err)
+               }
+       }
+       fsnotify.NotifyPath(aclFile, handle)
+}
+
+func WithAclFile(aclFile string) Option {
+       return &genericOption{
+               apply: func(s *ephsServicer) error {
+                       acl, err := LoadAcl(aclFile)
+                       if err != nil {
+                               return err
+                       }
+                       s.logger.Noticef("loaded %d ACL rules from %s",
+                               len(acl.Rules),
+                               aclFile)
+                       s.acl = acl
+                       s.watchAcl(aclFile)
+                       return nil
+               },
+       }
+}
+
+func NewEphsServicer(opts ...Option) (ephs_proto.EphsServer, error) {
+       serv := &ephsServicer{
+               clients: make(map[string]*etcd_client.Client),
+               logger:  log.Default().WithPrefix("ephsServicer"),
+       }
+       if _, err := serv.clientForCell(cell); err != nil {
+               return nil, err
+       }
+       for _, o := range opts {
+               if err := o.Apply(serv); err != nil {
+                       return nil, err
+               }
+       }
+
+       s3, err := serv.newS3Client()
+       if err != nil {
+               return nil, err
+       }
+       serv.s3Client = s3
+
+       return serv, nil
+}
+
+func (s *ephsServicer) clientForCell(cell string) (*etcd_client.Client, error) {
+       s.clientMu.Lock()
+       defer s.clientMu.Unlock()
+
+       if _, ok := s.clients[cell]; !ok {
+               client, err := sd.NewEtcdClient(
+                       mtls.DefaultIdentity(),
+                       cell,
+               )
+               if err != nil {
+                       return nil, err
+               }
+               s.clients[cell] = client
+       }
+
+       return s.clients[cell], nil
+}
+
+func (s *ephsServicer) checkPath(ctx context.Context, path string) (string, error) {
+       if !ephs.IsEphsPath(path) {
+               return "", status.Errorf(codes.InvalidArgument, "path %q must start with "+ephs.KeyPrefix, path)
+       }
+
+       parts := strings.Split(path, "/")
+
+       if len(parts) < 3 {
+               return "", status.Errorf(
+                       codes.InvalidArgument,
+                       "path must be at least 3 levels deep: %q",
+                       path)
+       }
+
+       for i, part := range parts {
+               if (i > 0 && part == "") || part == "." || part == ".." {
+                       return "", status.Errorf(codes.InvalidArgument,
+                               "path %q is invalid: may not contain empty elements, '.' or '..'", path)
+               }
+       }
+
+       ident, err := identityFromContext(ctx)
+       if err != nil {
+               return "", err
+       }
+
+       aclPath := strings.Join(parts[2:], "/")
+
+       if s.acl != nil {
+               if !s.acl.Check(ident, aclPath) {
+                       return "", status.Errorf(
+                               codes.PermissionDenied,
+                               "access to path %q denied for %q",
+                               aclPath,
+                               ident)
+               }
+       }
+
+       if parts[2] == "local" {
+               parts[2] = cell
+       }
+
+       return strings.Join(parts, "/"), nil
+}
+
+func (s *ephsServicer) Stat(ctx context.Context, req *ephs_proto.GetRequest) (*ephs_proto.StatResponse, error) {
+       s.logRequest(ctx, "Stat", req)
+       entry, err := s.getPath(ctx, req.Path)
+       if err != nil {
+               return nil, err
+       }
+
+       return &ephs_proto.StatResponse{
+               Entry: entry,
+       }, nil
+}
+
+func (s *ephsServicer) getPath(ctx context.Context, path string) (*ephs_proto.FsEntry, error) {
+       var err error
+       if path, err = s.checkPath(ctx, path); err != nil {
+               return nil, err
+       }
+
+       cell := strings.Split(path, "/")[2]
+       etcd, err := s.clientForCell(cell)
+       if err != nil {
+               return nil, err
+       }
+
+       obj, err := etcd.Get(ctx, path)
+       if err != nil {
+               return nil, err
+       }
+
+       if len(obj.Kvs) < 1 {
+               return nil, status.Errorf(
+                       codes.NotFound,
+                       "object does not exist: %q",
+                       path)
+       }
+
+       entry := &ephs_proto.FsEntry{}
+       if err := proto.Unmarshal(obj.Kvs[0].Value, entry); err != nil {
+               return nil, status.Errorf(
+                       codes.DataLoss,
+                       "failed to unmarshal key %q as %s: %v",
+                       obj.Kvs[0].Key,
+                       proto.MessageName(entry),
+                       err)
+       }
+
+       return entry, nil
+}
+
+func (s *ephsServicer) modifyDirectory(ctx context.Context, path string, apply func(*ephs_proto.FsEntry_Directory, etcd_concurrency.STM) error) error {
+       var err error
+       if path, err = s.checkPath(ctx, path); err != nil {
+               return err
+       }
+
+       cell := strings.Split(path, "/")[2]
+       etcd, err := s.clientForCell(cell)
+       if err != nil {
+               return err
+       }
+
+       txnResp, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error {
+               dirRaw := []byte(stm.Get(path))
+               dir := newDir(ctx)
+               if len(dirRaw) > 0 {
+                       if err := proto.Unmarshal(dirRaw, dir); err != nil {
+                               return fmt.Errorf("error unmarshaling FsEntry at %q: %v", path, err)
+                       }
+               }
+
+               if dir.Content.GetDirectory() == nil {
+                       return fmt.Errorf("ephs path %q: not a directory", path)
+               }
+
+               if err = apply(dir.Content.GetDirectory(), stm); err != nil {
+                       return fmt.Errorf("error performing atomic directory modification: %v", err)
+               }
+
+               dir.Modified = timestamppb.Now()
+               dir.Version += 1
+
+               if dirRaw, err = proto.Marshal(dir); err != nil {
+                       return fmt.Errorf("error re-marshaling FsEntry at %q after calling apply: %v", path, err)
+               }
+
+               stm.Put(path, string(dirRaw))
+
+               return nil
+       })
+
+       s.logger.Debugf("transaction status modifying directory %q: %+v", path, txnResp)
+
+       return err
+}
+
+func (s *ephsServicer) logRequest(ctx context.Context, method string, req msgWithPath) {
+       id, err := identityFromContext(ctx)
+       if err != nil {
+               return
+       }
+
+       s.logger.AppendPrefix("]["+id).Infof("%s(%q)", method, req.GetPath())
+}
+
+func (s *ephsServicer) Get(req *ephs_proto.GetRequest, server ephs_proto.Ephs_GetServer) error {
+       ctx := server.Context()
+       s.logRequest(ctx, "Get", req)
+
+       entry, err := s.getPath(ctx, req.Path)
+       if err != nil {
+               return err
+       }
+
+       if entry.Content.GetDirectory() != nil {
+               return status.Errorf(
+                       codes.FailedPrecondition,
+                       "%q: is a directory",
+                       req.Path)
+       }
+
+       entryCopy := *entry
+
+       response := &ephs_proto.GetResponse{
+               Entry: &entryCopy,
+       }
+
+       response.Entry.Content = nil
+
+       ent := &FsEntry{entry}
+       reader, err := ent.GetContents(ctx, s.s3Client)
+       if err != nil {
+               return err
+       }
+
+       writer := &responseWriter{server: server, response: response}
+       n, err := io.Copy(writer, reader)
+       if err != nil {
+               s.logger.Error(err)
+               return err
+       }
+       s.logger.V(1).Noticef("Get(%q): wrote %d bytes", req.Path, n)
+
+       return nil
+}
+
+func (s *ephsServicer) mkdir(ctx context.Context, pathName string, recursive bool) error {
+       parts := strings.Split(pathName, "/")
+
+       cell := parts[2]
+       etcd, err := s.clientForCell(cell)
+       if err != nil {
+               return err
+       }
+
+       txn, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error {
+               var (
+                       dir *ephs_proto.FsEntry = newDir(ctx)
+                       err error
+               )
+               for i := 3; i < len(parts); i++ {
+                       parent := strings.Join(parts[:i], "/")
+
+                       dirRaw := []byte(stm.Get(parent))
+                       if len(dirRaw) == 0 {
+                               if !recursive {
+                                       return status.Errorf(codes.NotFound, "cannot mkdir %q: parent directory %q does not exist and recursion is disabled", pathName, parent)
+                               }
+
+                               dir = newDir(ctx)
+                       } else {
+                               err = proto.Unmarshal(dirRaw, dir)
+                               if err != nil {
+                                       return status.Errorf(codes.Internal,
+                                               "error unmarshaling parent directory %q: %T: %v",
+                                               parent, err, err)
+                               }
+
+                               if dir.Content.GetDirectory() == nil {
+                                       return status.Errorf(
+                                               codes.FailedPrecondition,
+                                               "cannot mkdir %q: parent item %q is not a directory (raw len: %d)",
+                                               pathName, parent, len(dirRaw))
+                               }
+
+                               dir.Modified = timestamppb.Now()
+                               dir.Version += 1
+                       }
+
+                       dEnt := dir.Content.GetDirectory()
+
+                       dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{
+                               Name:      parts[i],
+                               Directory: true,
+                       })
+
+                       dirRaw, err = proto.Marshal(dir)
+                       if err != nil {
+                               return status.Errorf(codes.Internal,
+                                       "error marshaling parent directory %q: %T: %v",
+                                       parent, err, err)
+                       }
+
+                       stm.Put(parent, string(dirRaw))
+               }
+
+               entry := []byte(stm.Get(pathName))
+               if len(entry) > 0 {
+                       return status.Errorf(codes.AlreadyExists,
+                               "cannot create directory %q: already exists",
+                               pathName)
+               }
+
+               dirRaw, err := proto.Marshal(newDir(ctx))
+               if err != nil {
+                       return status.Errorf(codes.Internal,
+                               "error marshaling new directory %q: %T: %v",
+                               pathName, err, err)
+               }
+
+               stm.Put(pathName, string(dirRaw))
+               return nil
+       })
+       if err != nil {
+               return err
+       }
+       if !txn.Succeeded {
+               return fmt.Errorf("transaction failed: %+v", txn)
+       }
+
+       return nil
+}
+
+func (s *ephsServicer) MkDir(ctx context.Context, req *ephs_proto.MkDirRequest) (*emptypb.Empty, error) {
+       path, err := s.checkPath(ctx, req.Path)
+       if err != nil {
+               return nil, err
+       }
+
+       return &emptypb.Empty{}, s.mkdir(ctx, path, req.Recursive)
+}
+
+func (s *ephsServicer) Delete(ctx context.Context, req *ephs_proto.DeleteRequest) (*emptypb.Empty, error) {
+       s.logRequest(ctx, "Delete", req)
+       targetPath, err := s.checkPath(ctx, req.Path)
+       if err != nil {
+               return nil, err
+       }
+
+       entry, err := s.getPath(ctx, targetPath)
+       if err != nil {
+               return nil, err
+       }
+
+       fse := &FsEntry{entry}
+       if fse.Content.GetDirectory() != nil {
+               if len(fse.Content.GetDirectory().Entries) > 0 && !req.Recursive {
+                       return nil, status.Errorf(codes.FailedPrecondition,
+                               "refusing to delete non-empty directory %q without recursion flag set",
+                               targetPath)
+               }
+
+               for _, ent := range fse.Content.GetDirectory().Entries {
+                       subReq := &ephs_proto.DeleteRequest{
+                               Path:      path.Join(targetPath, ent.Name),
+                               Recursive: true,
+                       }
+                       _, err = s.Delete(ctx, subReq)
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+       } else {
+               if err := fse.DeleteContents(ctx, s.s3Client); err != nil {
+                       return nil, err
+               }
+       }
+
+       dir := path.Dir(targetPath)
+       err = s.modifyDirectory(ctx, dir, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error {
+               index := -1
+               for i, ent := range dEnt.Entries {
+                       if ent.Name == path.Base(targetPath) {
+                               index = i
+                               break
+                       }
+               }
+               if index >= 0 {
+                       dEnt.Entries = slices.Delete(dEnt.Entries, index, index+1)
+               }
+               stm.Del(targetPath)
+               return nil
+       })
+       if err != nil {
+               return nil, err
+       }
+
+       return &emptypb.Empty{}, nil
+}
+
+func (s *ephsServicer) Put(server ephs_proto.Ephs_PutServer) error {
+       msg, err := server.Recv()
+       if err != nil {
+               return err
+       }
+
+       ctx := server.Context()
+       s.logRequest(ctx, "Put", msg)
+
+       peerId, err := identityFromContext(ctx)
+       if err != nil {
+               return err
+       }
+
+       targetPath, err := s.checkPath(ctx, msg.Path)
+       if err != nil {
+               return err
+       }
+
+       dir := path.Dir(targetPath)
+       _, err = s.getPath(ctx, dir)
+       if err != nil {
+               if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
+                       return status.Errorf(
+                               codes.FailedPrecondition,
+                               "cannot write %q: parent directory %q does not exist",
+                               targetPath, dir)
+               }
+               return err
+       }
+
+       pbEnt, err := s.getPath(ctx, targetPath)
+       if err != nil {
+               if status, ok := status.FromError(err); ok && status.Code() == codes.NotFound {
+                       // NotFound is only returned after all access checks have succeeded. Ok to
+                       // create file.
+                       now := timestamppb.Now()
+                       pbEnt = &ephs_proto.FsEntry{
+                               Created:  now,
+                               Modified: now,
+                               Version:  msg.Version - 1,
+                               Size:     msg.Size,
+                               Owner:    peerId,
+                       }
+               } else {
+                       // some other error besides not-found
+                       return err
+               }
+       } else {
+               if msg.Version != 0 && msg.Version != (pbEnt.Version+1) {
+                       return status.Errorf(
+                               codes.FailedPrecondition,
+                               "version conflict: attempted to overwrite version %d with %d, "+
+                                       "version number must be current version + 1",
+                               pbEnt.Version,
+                               msg.Version)
+               }
+       }
+       ent := &FsEntry{pbEnt}
+       if pbEnt.Content.GetDirectory() != nil {
+               return status.Errorf(codes.FailedPrecondition,
+                       "cannot write %q: path already exists and is a directory",
+                       targetPath)
+       }
+
+       reader := &putReader{
+               initialMsg: msg,
+               server:     server,
+               buf:        &bytes.Buffer{},
+       }
+
+       if err := ent.SetContents(ctx, s.s3Client, reader, msg.Size); err != nil {
+               return err
+       }
+
+       ent.FsEntry.Modified = timestamppb.Now()
+       ent.FsEntry.Size = msg.Size
+       ent.FsEntry.Version++
+
+       marshaled, err := proto.Marshal(ent.FsEntry)
+       if err != nil {
+               return err
+       }
+
+       err = s.modifyDirectory(ctx, dir, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error {
+               stm.Put(targetPath, string(marshaled))
+               base := path.Base(targetPath)
+               for _, ent := range dEnt.Entries {
+                       if ent.Name == base {
+                               return nil
+                       }
+               }
+
+               dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{
+                       Name:      base,
+                       Directory: false,
+               })
+
+               return nil
+       })
+
+       if err != nil {
+               _ = ent.DeleteContents(ctx, s.s3Client)
+               return err
+       }
+
+       response := &ephs_proto.StatResponse{
+               Entry: ent.FsEntry,
+       }
+       return server.SendAndClose(response)
+}
+
+func (s *ephsServicer) Watch(req *ephs_proto.GetRequest, server ephs_proto.Ephs_WatchServer) error {
+       ctx := etcd_client.WithRequireLeader(server.Context())
+       s.logRequest(ctx, "Watch", req)
+       path, err := s.checkPath(ctx, req.Path)
+       if err != nil {
+               return err
+       }
+
+       response := &ephs_proto.WatchResponse{
+               Entry: nil,
+               Event: ephs_proto.WatchResponse_DELETE,
+       }
+
+       if resp, err := s.getPath(ctx, path); err == nil {
+               response.Entry = resp
+               response.Event = ephs_proto.WatchResponse_CREATE
+       }
+
+       cell := strings.Split(path, "/")[2]
+       etcd, err := s.clientForCell(cell)
+       if err != nil {
+               return err
+       }
+
+       if err := server.Send(response); err != nil {
+               return err
+       }
+
+       for ctx.Err() == nil {
+               watcher := etcd.Watch(ctx, path)
+               for msg := range watcher {
+                       s.logger.Debugf("watcher: got event: %+v", msg)
+                       for _, event := range msg.Events {
+                               if string(event.Kv.Key) != path {
+                                       continue
+                               }
+
+                               response.Entry = &ephs_proto.FsEntry{}
+
+                               if event.IsCreate() {
+                                       response.Event = ephs_proto.WatchResponse_CREATE
+                               } else if event.IsModify() {
+                                       response.Event = ephs_proto.WatchResponse_MODIFY
+                               } else if event.Type == etcd_client.EventTypeDelete {
+                                       response.Entry = nil
+                                       response.Event = ephs_proto.WatchResponse_DELETE
+                               } else {
+                                       continue
+                               }
+
+                               if response.Entry != nil {
+                                       if err = proto.Unmarshal(event.Kv.Value, response.Entry); err != nil {
+                                               s.logger.Errorf("protobuf marshal error: %v", err)
+                                               return err
+                                       }
+                               }
+
+                               if err := server.Send(response); err != nil {
+                                       s.logger.Errorf("stream send error: %v", err)
+                                       return err
+                               }
+                       }
+               }
+       }
+
+       s.logger.Noticef("Watch(%q) ending: ctx err: %v", path, ctx.Err())
+
+       if errors.Is(ctx.Err(), context.Canceled) {
+               return status.Error(codes.Unavailable, "server is shutting down, please reconnect")
+       }
+
+       return nil
+}
+
+func identityFromContext(ctx context.Context) (string, error) {
+       peer, ok := peer.FromContext(ctx)
+       if !ok {
+               return "", status.Error(codes.Unauthenticated, "who are you??")
+       }
+
+       ident, err := grpc.PeerIdentity(peer)
+       if err != nil {
+               return "", status.Errorf(codes.PermissionDenied, "cannot determine your identity from peer info: %v", peer.AuthInfo)
+       }
+
+       return ident.String(), nil
+}
+
+func newDir(ctx context.Context) *ephs_proto.FsEntry {
+       whoami, _ := identityFromContext(ctx)
+
+       return &ephs_proto.FsEntry{
+               Content: &ephs_proto.FsEntry_Content{
+                       Content: &ephs_proto.FsEntry_Content_Directory{
+                               Directory: &ephs_proto.FsEntry_Directory{},
+                       },
+               },
+               Owner:    whoami,
+               Created:  timestamppb.Now(),
+               Modified: timestamppb.Now(),
+               Version:  1,
+       }
+}
+
+func init() {
+       flag.StringVar(&cell, "ephs.cell", hostname.DomainName(), "ephs cell")
+}
diff --git a/ephs/servicer/writer.go b/ephs/servicer/writer.go
new file mode 100644 (file)
index 0000000..6cccc6c
--- /dev/null
@@ -0,0 +1,67 @@
+package servicer
+
+import (
+       "bytes"
+       "fmt"
+
+       ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
+       "go.fuhry.dev/runtime/utils/log"
+)
+
+type responseWriter struct {
+       server   ephs_proto.Ephs_GetServer
+       response *ephs_proto.GetResponse
+}
+
+func (w *responseWriter) Write(p []byte) (int, error) {
+       log.Default().V(3).Debugf("responseWriter: writing %d bytes", len(p))
+       chunkLen := min(len(p), LargeFileThreshold)
+
+       w.response.Chunk = make([]byte, chunkLen)
+       copy(w.response.Chunk, p)
+
+       if err := w.server.Send(w.response); err != nil {
+               log.Default().Error(err)
+               return 0, err
+       }
+
+       log.Default().V(2).Noticef("wrote %d bytes", chunkLen)
+       return chunkLen, nil
+}
+
+type putReader struct {
+       initialMsg *ephs_proto.PutRequest
+       server     ephs_proto.Ephs_PutServer
+       buf        *bytes.Buffer
+}
+
+func (r *putReader) Read(p []byte) (int, error) {
+       var msg *ephs_proto.PutRequest
+       if r.buf.Len() > 0 {
+               n, err := r.buf.Read(p)
+               return n, err
+       } else if r.initialMsg != nil {
+               msg = r.initialMsg
+               r.initialMsg = nil
+       } else {
+               var err error
+               msg, err = r.server.Recv()
+               if err != nil {
+                       return 0, err
+               }
+       }
+
+       nw, err := r.buf.Write(msg.Chunk)
+       if err != nil {
+               return 0, err
+       }
+       if nw != len(msg.Chunk) {
+               return 0, fmt.Errorf(
+                       "overrun on bytes.Buffer putReader: expected to write %d bytes, "+
+                               "but only wrote %d",
+                       len(r.initialMsg.Chunk),
+                       nw)
+       }
+
+       return r.buf.Read(p)
+}
diff --git a/proto/service/ephs/Makefile b/proto/service/ephs/Makefile
new file mode 100644 (file)
index 0000000..501225c
--- /dev/null
@@ -0,0 +1,14 @@
+PROTO_SRCS := $(wildcard *.proto)
+PROTO_GO_OUTPUT := $(PROTO_SRCS:.proto=.pb.go) $(PROTO_SRCS:.proto=_grpc.pb.go)
+
+$(PROTO_GO_OUTPUT): $(PROTO_SRCS)
+       protoc --go_out=. --go_opt=paths=source_relative \
+               --go-grpc_out=. --go-grpc_opt=paths=source_relative \
+               $(PROTO_SRCS)
+
+pb_go: $(PROTO_GO_OUTPUT)
+
+all: pb_go
+
+clean:
+       rm -fv $(PROTO_GO_OUTPUT)
diff --git a/proto/service/ephs/service.pb.go b/proto/service/ephs/service.pb.go
new file mode 100644 (file)
index 0000000..7e7eb77
--- /dev/null
@@ -0,0 +1,559 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+//     protoc-gen-go v1.36.10
+//     protoc        v6.32.1
+// source: service.proto
+
+package ephs
+
+import (
+       protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+       protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+       emptypb "google.golang.org/protobuf/types/known/emptypb"
+       reflect "reflect"
+       sync "sync"
+       unsafe "unsafe"
+)
+
+const (
+       // Verify that this generated code is sufficiently up-to-date.
+       _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+       // Verify that runtime/protoimpl is sufficiently up-to-date.
+       _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type WatchResponse_Event int32
+
+const (
+       WatchResponse_UNKNOWN WatchResponse_Event = 0
+       WatchResponse_CREATE  WatchResponse_Event = 1
+       WatchResponse_MODIFY  WatchResponse_Event = 2
+       WatchResponse_DELETE  WatchResponse_Event = 3
+)
+
+// Enum value maps for WatchResponse_Event.
+var (
+       WatchResponse_Event_name = map[int32]string{
+               0: "UNKNOWN",
+               1: "CREATE",
+               2: "MODIFY",
+               3: "DELETE",
+       }
+       WatchResponse_Event_value = map[string]int32{
+               "UNKNOWN": 0,
+               "CREATE":  1,
+               "MODIFY":  2,
+               "DELETE":  3,
+       }
+)
+
+func (x WatchResponse_Event) Enum() *WatchResponse_Event {
+       p := new(WatchResponse_Event)
+       *p = x
+       return p
+}
+
+func (x WatchResponse_Event) String() string {
+       return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (WatchResponse_Event) Descriptor() protoreflect.EnumDescriptor {
+       return file_service_proto_enumTypes[0].Descriptor()
+}
+
+func (WatchResponse_Event) Type() protoreflect.EnumType {
+       return &file_service_proto_enumTypes[0]
+}
+
+func (x WatchResponse_Event) Number() protoreflect.EnumNumber {
+       return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use WatchResponse_Event.Descriptor instead.
+func (WatchResponse_Event) EnumDescriptor() ([]byte, []int) {
+       return file_service_proto_rawDescGZIP(), []int{6, 0}
+}
+
+type GetRequest struct {
+       state         protoimpl.MessageState `protogen:"open.v1"`
+       Path          string                 `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
+       unknownFields protoimpl.UnknownFields
+       sizeCache     protoimpl.SizeCache
+}
+
+func (x *GetRequest) Reset() {
+       *x = GetRequest{}
+       mi := &file_service_proto_msgTypes[0]
+       ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+       ms.StoreMessageInfo(mi)
+}
+
+func (x *GetRequest) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*GetRequest) ProtoMessage() {}
+
+func (x *GetRequest) ProtoReflect() protoreflect.Message {
+       mi := &file_service_proto_msgTypes[0]
+       if x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use GetRequest.ProtoReflect.Descriptor instead.
+func (*GetRequest) Descriptor() ([]byte, []int) {
+       return file_service_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *GetRequest) GetPath() string {
+       if x != nil {
+               return x.Path
+       }
+       return ""
+}
+
+type DeleteRequest struct {
+       state         protoimpl.MessageState `protogen:"open.v1"`
+       Path          string                 `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
+       Recursive     bool                   `protobuf:"varint,2,opt,name=recursive,proto3" json:"recursive,omitempty"`
+       unknownFields protoimpl.UnknownFields
+       sizeCache     protoimpl.SizeCache
+}
+
+func (x *DeleteRequest) Reset() {
+       *x = DeleteRequest{}
+       mi := &file_service_proto_msgTypes[1]
+       ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+       ms.StoreMessageInfo(mi)
+}
+
+func (x *DeleteRequest) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*DeleteRequest) ProtoMessage() {}
+
+func (x *DeleteRequest) ProtoReflect() protoreflect.Message {
+       mi := &file_service_proto_msgTypes[1]
+       if x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use DeleteRequest.ProtoReflect.Descriptor instead.
+func (*DeleteRequest) Descriptor() ([]byte, []int) {
+       return file_service_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *DeleteRequest) GetPath() string {
+       if x != nil {
+               return x.Path
+       }
+       return ""
+}
+
+func (x *DeleteRequest) GetRecursive() bool {
+       if x != nil {
+               return x.Recursive
+       }
+       return false
+}
+
+type MkDirRequest struct {
+       state         protoimpl.MessageState `protogen:"open.v1"`
+       Path          string                 `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
+       Recursive     bool                   `protobuf:"varint,2,opt,name=recursive,proto3" json:"recursive,omitempty"`
+       unknownFields protoimpl.UnknownFields
+       sizeCache     protoimpl.SizeCache
+}
+
+func (x *MkDirRequest) Reset() {
+       *x = MkDirRequest{}
+       mi := &file_service_proto_msgTypes[2]
+       ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+       ms.StoreMessageInfo(mi)
+}
+
+func (x *MkDirRequest) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*MkDirRequest) ProtoMessage() {}
+
+func (x *MkDirRequest) ProtoReflect() protoreflect.Message {
+       mi := &file_service_proto_msgTypes[2]
+       if x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use MkDirRequest.ProtoReflect.Descriptor instead.
+func (*MkDirRequest) Descriptor() ([]byte, []int) {
+       return file_service_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *MkDirRequest) GetPath() string {
+       if x != nil {
+               return x.Path
+       }
+       return ""
+}
+
+func (x *MkDirRequest) GetRecursive() bool {
+       if x != nil {
+               return x.Recursive
+       }
+       return false
+}
+
+type GetResponse struct {
+       state         protoimpl.MessageState `protogen:"open.v1"`
+       Entry         *FsEntry               `protobuf:"bytes,1,opt,name=entry,proto3" json:"entry,omitempty"`
+       Chunk         []byte                 `protobuf:"bytes,2,opt,name=chunk,proto3" json:"chunk,omitempty"`
+       unknownFields protoimpl.UnknownFields
+       sizeCache     protoimpl.SizeCache
+}
+
+func (x *GetResponse) Reset() {
+       *x = GetResponse{}
+       mi := &file_service_proto_msgTypes[3]
+       ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+       ms.StoreMessageInfo(mi)
+}
+
+func (x *GetResponse) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*GetResponse) ProtoMessage() {}
+
+func (x *GetResponse) ProtoReflect() protoreflect.Message {
+       mi := &file_service_proto_msgTypes[3]
+       if x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use GetResponse.ProtoReflect.Descriptor instead.
+func (*GetResponse) Descriptor() ([]byte, []int) {
+       return file_service_proto_rawDescGZIP(), []int{3}
+}
+
+func (x *GetResponse) GetEntry() *FsEntry {
+       if x != nil {
+               return x.Entry
+       }
+       return nil
+}
+
+func (x *GetResponse) GetChunk() []byte {
+       if x != nil {
+               return x.Chunk
+       }
+       return nil
+}
+
+type StatResponse struct {
+       state         protoimpl.MessageState `protogen:"open.v1"`
+       Entry         *FsEntry               `protobuf:"bytes,1,opt,name=entry,proto3" json:"entry,omitempty"`
+       unknownFields protoimpl.UnknownFields
+       sizeCache     protoimpl.SizeCache
+}
+
+func (x *StatResponse) Reset() {
+       *x = StatResponse{}
+       mi := &file_service_proto_msgTypes[4]
+       ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+       ms.StoreMessageInfo(mi)
+}
+
+func (x *StatResponse) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*StatResponse) ProtoMessage() {}
+
+func (x *StatResponse) ProtoReflect() protoreflect.Message {
+       mi := &file_service_proto_msgTypes[4]
+       if x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use StatResponse.ProtoReflect.Descriptor instead.
+func (*StatResponse) Descriptor() ([]byte, []int) {
+       return file_service_proto_rawDescGZIP(), []int{4}
+}
+
+func (x *StatResponse) GetEntry() *FsEntry {
+       if x != nil {
+               return x.Entry
+       }
+       return nil
+}
+
+type PutRequest struct {
+       state         protoimpl.MessageState `protogen:"open.v1"`
+       Path          string                 `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
+       Version       uint64                 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty"`
+       Size          uint64                 `protobuf:"varint,3,opt,name=size,proto3" json:"size,omitempty"`
+       Chunk         []byte                 `protobuf:"bytes,4,opt,name=chunk,proto3" json:"chunk,omitempty"`
+       unknownFields protoimpl.UnknownFields
+       sizeCache     protoimpl.SizeCache
+}
+
+func (x *PutRequest) Reset() {
+       *x = PutRequest{}
+       mi := &file_service_proto_msgTypes[5]
+       ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+       ms.StoreMessageInfo(mi)
+}
+
+func (x *PutRequest) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*PutRequest) ProtoMessage() {}
+
+func (x *PutRequest) ProtoReflect() protoreflect.Message {
+       mi := &file_service_proto_msgTypes[5]
+       if x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use PutRequest.ProtoReflect.Descriptor instead.
+func (*PutRequest) Descriptor() ([]byte, []int) {
+       return file_service_proto_rawDescGZIP(), []int{5}
+}
+
+func (x *PutRequest) GetPath() string {
+       if x != nil {
+               return x.Path
+       }
+       return ""
+}
+
+func (x *PutRequest) GetVersion() uint64 {
+       if x != nil {
+               return x.Version
+       }
+       return 0
+}
+
+func (x *PutRequest) GetSize() uint64 {
+       if x != nil {
+               return x.Size
+       }
+       return 0
+}
+
+func (x *PutRequest) GetChunk() []byte {
+       if x != nil {
+               return x.Chunk
+       }
+       return nil
+}
+
+type WatchResponse struct {
+       state         protoimpl.MessageState `protogen:"open.v1"`
+       Entry         *FsEntry               `protobuf:"bytes,1,opt,name=entry,proto3" json:"entry,omitempty"`
+       Event         WatchResponse_Event    `protobuf:"varint,2,opt,name=event,proto3,enum=fuhry.runtime.service.ephs.WatchResponse_Event" json:"event,omitempty"`
+       unknownFields protoimpl.UnknownFields
+       sizeCache     protoimpl.SizeCache
+}
+
+func (x *WatchResponse) Reset() {
+       *x = WatchResponse{}
+       mi := &file_service_proto_msgTypes[6]
+       ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+       ms.StoreMessageInfo(mi)
+}
+
+func (x *WatchResponse) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*WatchResponse) ProtoMessage() {}
+
+func (x *WatchResponse) ProtoReflect() protoreflect.Message {
+       mi := &file_service_proto_msgTypes[6]
+       if x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use WatchResponse.ProtoReflect.Descriptor instead.
+func (*WatchResponse) Descriptor() ([]byte, []int) {
+       return file_service_proto_rawDescGZIP(), []int{6}
+}
+
+func (x *WatchResponse) GetEntry() *FsEntry {
+       if x != nil {
+               return x.Entry
+       }
+       return nil
+}
+
+func (x *WatchResponse) GetEvent() WatchResponse_Event {
+       if x != nil {
+               return x.Event
+       }
+       return WatchResponse_UNKNOWN
+}
+
+var File_service_proto protoreflect.FileDescriptor
+
+const file_service_proto_rawDesc = "" +
+       "\n" +
+       "\rservice.proto\x12\x1afuhry.runtime.service.ephs\x1a\x1bgoogle/protobuf/empty.proto\x1a\vtypes.proto\" \n" +
+       "\n" +
+       "GetRequest\x12\x12\n" +
+       "\x04path\x18\x01 \x01(\tR\x04path\"A\n" +
+       "\rDeleteRequest\x12\x12\n" +
+       "\x04path\x18\x01 \x01(\tR\x04path\x12\x1c\n" +
+       "\trecursive\x18\x02 \x01(\bR\trecursive\"@\n" +
+       "\fMkDirRequest\x12\x12\n" +
+       "\x04path\x18\x01 \x01(\tR\x04path\x12\x1c\n" +
+       "\trecursive\x18\x02 \x01(\bR\trecursive\"^\n" +
+       "\vGetResponse\x129\n" +
+       "\x05entry\x18\x01 \x01(\v2#.fuhry.runtime.service.ephs.FsEntryR\x05entry\x12\x14\n" +
+       "\x05chunk\x18\x02 \x01(\fR\x05chunk\"I\n" +
+       "\fStatResponse\x129\n" +
+       "\x05entry\x18\x01 \x01(\v2#.fuhry.runtime.service.ephs.FsEntryR\x05entry\"d\n" +
+       "\n" +
+       "PutRequest\x12\x12\n" +
+       "\x04path\x18\x01 \x01(\tR\x04path\x12\x18\n" +
+       "\aversion\x18\x02 \x01(\x04R\aversion\x12\x12\n" +
+       "\x04size\x18\x03 \x01(\x04R\x04size\x12\x14\n" +
+       "\x05chunk\x18\x04 \x01(\fR\x05chunk\"\xcb\x01\n" +
+       "\rWatchResponse\x129\n" +
+       "\x05entry\x18\x01 \x01(\v2#.fuhry.runtime.service.ephs.FsEntryR\x05entry\x12E\n" +
+       "\x05event\x18\x02 \x01(\x0e2/.fuhry.runtime.service.ephs.WatchResponse.EventR\x05event\"8\n" +
+       "\x05Event\x12\v\n" +
+       "\aUNKNOWN\x10\x00\x12\n" +
+       "\n" +
+       "\x06CREATE\x10\x01\x12\n" +
+       "\n" +
+       "\x06MODIFY\x10\x02\x12\n" +
+       "\n" +
+       "\x06DELETE\x10\x032\x97\x04\n" +
+       "\x04Ephs\x12Z\n" +
+       "\x03Get\x12&.fuhry.runtime.service.ephs.GetRequest\x1a'.fuhry.runtime.service.ephs.GetResponse\"\x000\x01\x12[\n" +
+       "\x03Put\x12&.fuhry.runtime.service.ephs.PutRequest\x1a(.fuhry.runtime.service.ephs.StatResponse\"\x00(\x01\x12Z\n" +
+       "\x04Stat\x12&.fuhry.runtime.service.ephs.GetRequest\x1a(.fuhry.runtime.service.ephs.StatResponse\"\x00\x12^\n" +
+       "\x05Watch\x12&.fuhry.runtime.service.ephs.GetRequest\x1a).fuhry.runtime.service.ephs.WatchResponse\"\x000\x01\x12K\n" +
+       "\x05MkDir\x12(.fuhry.runtime.service.ephs.MkDirRequest\x1a\x16.google.protobuf.Empty\"\x00\x12M\n" +
+       "\x06Delete\x12).fuhry.runtime.service.ephs.DeleteRequest\x1a\x16.google.protobuf.Empty\"\x00B)Z'go.fuhry.dev/runtime/proto/service/ephsb\x06proto3"
+
+var (
+       file_service_proto_rawDescOnce sync.Once
+       file_service_proto_rawDescData []byte
+)
+
+func file_service_proto_rawDescGZIP() []byte {
+       file_service_proto_rawDescOnce.Do(func() {
+               file_service_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_service_proto_rawDesc), len(file_service_proto_rawDesc)))
+       })
+       return file_service_proto_rawDescData
+}
+
+var file_service_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
+var file_service_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
+var file_service_proto_goTypes = []any{
+       (WatchResponse_Event)(0), // 0: fuhry.runtime.service.ephs.WatchResponse.Event
+       (*GetRequest)(nil),       // 1: fuhry.runtime.service.ephs.GetRequest
+       (*DeleteRequest)(nil),    // 2: fuhry.runtime.service.ephs.DeleteRequest
+       (*MkDirRequest)(nil),     // 3: fuhry.runtime.service.ephs.MkDirRequest
+       (*GetResponse)(nil),      // 4: fuhry.runtime.service.ephs.GetResponse
+       (*StatResponse)(nil),     // 5: fuhry.runtime.service.ephs.StatResponse
+       (*PutRequest)(nil),       // 6: fuhry.runtime.service.ephs.PutRequest
+       (*WatchResponse)(nil),    // 7: fuhry.runtime.service.ephs.WatchResponse
+       (*FsEntry)(nil),          // 8: fuhry.runtime.service.ephs.FsEntry
+       (*emptypb.Empty)(nil),    // 9: google.protobuf.Empty
+}
+var file_service_proto_depIdxs = []int32{
+       8,  // 0: fuhry.runtime.service.ephs.GetResponse.entry:type_name -> fuhry.runtime.service.ephs.FsEntry
+       8,  // 1: fuhry.runtime.service.ephs.StatResponse.entry:type_name -> fuhry.runtime.service.ephs.FsEntry
+       8,  // 2: fuhry.runtime.service.ephs.WatchResponse.entry:type_name -> fuhry.runtime.service.ephs.FsEntry
+       0,  // 3: fuhry.runtime.service.ephs.WatchResponse.event:type_name -> fuhry.runtime.service.ephs.WatchResponse.Event
+       1,  // 4: fuhry.runtime.service.ephs.Ephs.Get:input_type -> fuhry.runtime.service.ephs.GetRequest
+       6,  // 5: fuhry.runtime.service.ephs.Ephs.Put:input_type -> fuhry.runtime.service.ephs.PutRequest
+       1,  // 6: fuhry.runtime.service.ephs.Ephs.Stat:input_type -> fuhry.runtime.service.ephs.GetRequest
+       1,  // 7: fuhry.runtime.service.ephs.Ephs.Watch:input_type -> fuhry.runtime.service.ephs.GetRequest
+       3,  // 8: fuhry.runtime.service.ephs.Ephs.MkDir:input_type -> fuhry.runtime.service.ephs.MkDirRequest
+       2,  // 9: fuhry.runtime.service.ephs.Ephs.Delete:input_type -> fuhry.runtime.service.ephs.DeleteRequest
+       4,  // 10: fuhry.runtime.service.ephs.Ephs.Get:output_type -> fuhry.runtime.service.ephs.GetResponse
+       5,  // 11: fuhry.runtime.service.ephs.Ephs.Put:output_type -> fuhry.runtime.service.ephs.StatResponse
+       5,  // 12: fuhry.runtime.service.ephs.Ephs.Stat:output_type -> fuhry.runtime.service.ephs.StatResponse
+       7,  // 13: fuhry.runtime.service.ephs.Ephs.Watch:output_type -> fuhry.runtime.service.ephs.WatchResponse
+       9,  // 14: fuhry.runtime.service.ephs.Ephs.MkDir:output_type -> google.protobuf.Empty
+       9,  // 15: fuhry.runtime.service.ephs.Ephs.Delete:output_type -> google.protobuf.Empty
+       10, // [10:16] is the sub-list for method output_type
+       4,  // [4:10] is the sub-list for method input_type
+       4,  // [4:4] is the sub-list for extension type_name
+       4,  // [4:4] is the sub-list for extension extendee
+       0,  // [0:4] is the sub-list for field type_name
+}
+
+func init() { file_service_proto_init() }
+func file_service_proto_init() {
+       if File_service_proto != nil {
+               return
+       }
+       file_types_proto_init()
+       type x struct{}
+       out := protoimpl.TypeBuilder{
+               File: protoimpl.DescBuilder{
+                       GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+                       RawDescriptor: unsafe.Slice(unsafe.StringData(file_service_proto_rawDesc), len(file_service_proto_rawDesc)),
+                       NumEnums:      1,
+                       NumMessages:   7,
+                       NumExtensions: 0,
+                       NumServices:   1,
+               },
+               GoTypes:           file_service_proto_goTypes,
+               DependencyIndexes: file_service_proto_depIdxs,
+               EnumInfos:         file_service_proto_enumTypes,
+               MessageInfos:      file_service_proto_msgTypes,
+       }.Build()
+       File_service_proto = out.File
+       file_service_proto_goTypes = nil
+       file_service_proto_depIdxs = nil
+}
diff --git a/proto/service/ephs/service.proto b/proto/service/ephs/service.proto
new file mode 100644 (file)
index 0000000..4f50b6a
--- /dev/null
@@ -0,0 +1,57 @@
+syntax = "proto3";
+import "google/protobuf/empty.proto";
+
+option go_package = "go.fuhry.dev/runtime/proto/service/ephs";
+
+package fuhry.runtime.service.ephs;
+
+message GetRequest {
+    string path = 1;
+}
+
+message DeleteRequest {
+    string path = 1;
+    bool recursive = 2;
+}
+
+message MkDirRequest {
+    string path = 1;
+    bool recursive = 2;
+}
+
+message GetResponse {
+    FsEntry entry = 1;
+    bytes chunk = 2;
+}
+
+message StatResponse {
+    FsEntry entry = 1;
+}
+
+message PutRequest {
+    string path = 1;
+    uint64 version = 2;
+    uint64 size = 3;
+    bytes chunk = 4;
+}
+
+message WatchResponse {
+    enum Event {
+        UNKNOWN = 0;
+        CREATE = 1;
+        MODIFY = 2;
+        DELETE = 3;
+    }
+
+    FsEntry entry = 1;
+    Event event = 2;
+}
+
+service Ephs {
+    rpc Get (GetRequest) returns (stream GetResponse) {}
+    rpc Put (stream PutRequest) returns (StatResponse) {}
+    rpc Stat (GetRequest) returns (StatResponse) {}
+    rpc Watch (GetRequest) returns (stream WatchResponse) {}
+    rpc MkDir (MkDirRequest) returns (google.protobuf.Empty) {}
+    rpc Delete (DeleteRequest) returns (google.protobuf.Empty) {}
+}
diff --git a/proto/service/ephs/service_grpc.pb.go b/proto/service/ephs/service_grpc.pb.go
new file mode 100644 (file)
index 0000000..f02bb0d
--- /dev/null
@@ -0,0 +1,384 @@
+// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
+// versions:
+// - protoc-gen-go-grpc v1.3.0
+// - protoc             v6.32.1
+// source: service.proto
+
+package ephs
+
+import (
+       context "context"
+       grpc "google.golang.org/grpc"
+       codes "google.golang.org/grpc/codes"
+       status "google.golang.org/grpc/status"
+       emptypb "google.golang.org/protobuf/types/known/emptypb"
+)
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+// Requires gRPC-Go v1.32.0 or later.
+const _ = grpc.SupportPackageIsVersion7
+
+const (
+       Ephs_Get_FullMethodName    = "/fuhry.runtime.service.ephs.Ephs/Get"
+       Ephs_Put_FullMethodName    = "/fuhry.runtime.service.ephs.Ephs/Put"
+       Ephs_Stat_FullMethodName   = "/fuhry.runtime.service.ephs.Ephs/Stat"
+       Ephs_Watch_FullMethodName  = "/fuhry.runtime.service.ephs.Ephs/Watch"
+       Ephs_MkDir_FullMethodName  = "/fuhry.runtime.service.ephs.Ephs/MkDir"
+       Ephs_Delete_FullMethodName = "/fuhry.runtime.service.ephs.Ephs/Delete"
+)
+
+// EphsClient is the client API for Ephs service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
+type EphsClient interface {
+       Get(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (Ephs_GetClient, error)
+       Put(ctx context.Context, opts ...grpc.CallOption) (Ephs_PutClient, error)
+       Stat(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*StatResponse, error)
+       Watch(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (Ephs_WatchClient, error)
+       MkDir(ctx context.Context, in *MkDirRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
+       Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
+}
+
+type ephsClient struct {
+       cc grpc.ClientConnInterface
+}
+
+func NewEphsClient(cc grpc.ClientConnInterface) EphsClient {
+       return &ephsClient{cc}
+}
+
+func (c *ephsClient) Get(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (Ephs_GetClient, error) {
+       stream, err := c.cc.NewStream(ctx, &Ephs_ServiceDesc.Streams[0], Ephs_Get_FullMethodName, opts...)
+       if err != nil {
+               return nil, err
+       }
+       x := &ephsGetClient{stream}
+       if err := x.ClientStream.SendMsg(in); err != nil {
+               return nil, err
+       }
+       if err := x.ClientStream.CloseSend(); err != nil {
+               return nil, err
+       }
+       return x, nil
+}
+
+type Ephs_GetClient interface {
+       Recv() (*GetResponse, error)
+       grpc.ClientStream
+}
+
+type ephsGetClient struct {
+       grpc.ClientStream
+}
+
+func (x *ephsGetClient) Recv() (*GetResponse, error) {
+       m := new(GetResponse)
+       if err := x.ClientStream.RecvMsg(m); err != nil {
+               return nil, err
+       }
+       return m, nil
+}
+
+func (c *ephsClient) Put(ctx context.Context, opts ...grpc.CallOption) (Ephs_PutClient, error) {
+       stream, err := c.cc.NewStream(ctx, &Ephs_ServiceDesc.Streams[1], Ephs_Put_FullMethodName, opts...)
+       if err != nil {
+               return nil, err
+       }
+       x := &ephsPutClient{stream}
+       return x, nil
+}
+
+type Ephs_PutClient interface {
+       Send(*PutRequest) error
+       CloseAndRecv() (*StatResponse, error)
+       grpc.ClientStream
+}
+
+type ephsPutClient struct {
+       grpc.ClientStream
+}
+
+func (x *ephsPutClient) Send(m *PutRequest) error {
+       return x.ClientStream.SendMsg(m)
+}
+
+func (x *ephsPutClient) CloseAndRecv() (*StatResponse, error) {
+       if err := x.ClientStream.CloseSend(); err != nil {
+               return nil, err
+       }
+       m := new(StatResponse)
+       if err := x.ClientStream.RecvMsg(m); err != nil {
+               return nil, err
+       }
+       return m, nil
+}
+
+func (c *ephsClient) Stat(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*StatResponse, error) {
+       out := new(StatResponse)
+       err := c.cc.Invoke(ctx, Ephs_Stat_FullMethodName, in, out, opts...)
+       if err != nil {
+               return nil, err
+       }
+       return out, nil
+}
+
+func (c *ephsClient) Watch(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (Ephs_WatchClient, error) {
+       stream, err := c.cc.NewStream(ctx, &Ephs_ServiceDesc.Streams[2], Ephs_Watch_FullMethodName, opts...)
+       if err != nil {
+               return nil, err
+       }
+       x := &ephsWatchClient{stream}
+       if err := x.ClientStream.SendMsg(in); err != nil {
+               return nil, err
+       }
+       if err := x.ClientStream.CloseSend(); err != nil {
+               return nil, err
+       }
+       return x, nil
+}
+
+type Ephs_WatchClient interface {
+       Recv() (*WatchResponse, error)
+       grpc.ClientStream
+}
+
+type ephsWatchClient struct {
+       grpc.ClientStream
+}
+
+func (x *ephsWatchClient) Recv() (*WatchResponse, error) {
+       m := new(WatchResponse)
+       if err := x.ClientStream.RecvMsg(m); err != nil {
+               return nil, err
+       }
+       return m, nil
+}
+
+func (c *ephsClient) MkDir(ctx context.Context, in *MkDirRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
+       out := new(emptypb.Empty)
+       err := c.cc.Invoke(ctx, Ephs_MkDir_FullMethodName, in, out, opts...)
+       if err != nil {
+               return nil, err
+       }
+       return out, nil
+}
+
+func (c *ephsClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
+       out := new(emptypb.Empty)
+       err := c.cc.Invoke(ctx, Ephs_Delete_FullMethodName, in, out, opts...)
+       if err != nil {
+               return nil, err
+       }
+       return out, nil
+}
+
+// EphsServer is the server API for Ephs service.
+// All implementations must embed UnimplementedEphsServer
+// for forward compatibility
+type EphsServer interface {
+       Get(*GetRequest, Ephs_GetServer) error
+       Put(Ephs_PutServer) error
+       Stat(context.Context, *GetRequest) (*StatResponse, error)
+       Watch(*GetRequest, Ephs_WatchServer) error
+       MkDir(context.Context, *MkDirRequest) (*emptypb.Empty, error)
+       Delete(context.Context, *DeleteRequest) (*emptypb.Empty, error)
+       mustEmbedUnimplementedEphsServer()
+}
+
+// UnimplementedEphsServer must be embedded to have forward compatible implementations.
+type UnimplementedEphsServer struct {
+}
+
+func (UnimplementedEphsServer) Get(*GetRequest, Ephs_GetServer) error {
+       return status.Errorf(codes.Unimplemented, "method Get not implemented")
+}
+func (UnimplementedEphsServer) Put(Ephs_PutServer) error {
+       return status.Errorf(codes.Unimplemented, "method Put not implemented")
+}
+func (UnimplementedEphsServer) Stat(context.Context, *GetRequest) (*StatResponse, error) {
+       return nil, status.Errorf(codes.Unimplemented, "method Stat not implemented")
+}
+func (UnimplementedEphsServer) Watch(*GetRequest, Ephs_WatchServer) error {
+       return status.Errorf(codes.Unimplemented, "method Watch not implemented")
+}
+func (UnimplementedEphsServer) MkDir(context.Context, *MkDirRequest) (*emptypb.Empty, error) {
+       return nil, status.Errorf(codes.Unimplemented, "method MkDir not implemented")
+}
+func (UnimplementedEphsServer) Delete(context.Context, *DeleteRequest) (*emptypb.Empty, error) {
+       return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented")
+}
+func (UnimplementedEphsServer) mustEmbedUnimplementedEphsServer() {}
+
+// UnsafeEphsServer may be embedded to opt out of forward compatibility for this service.
+// Use of this interface is not recommended, as added methods to EphsServer will
+// result in compilation errors.
+type UnsafeEphsServer interface {
+       mustEmbedUnimplementedEphsServer()
+}
+
+func RegisterEphsServer(s grpc.ServiceRegistrar, srv EphsServer) {
+       s.RegisterService(&Ephs_ServiceDesc, srv)
+}
+
+func _Ephs_Get_Handler(srv interface{}, stream grpc.ServerStream) error {
+       m := new(GetRequest)
+       if err := stream.RecvMsg(m); err != nil {
+               return err
+       }
+       return srv.(EphsServer).Get(m, &ephsGetServer{stream})
+}
+
+type Ephs_GetServer interface {
+       Send(*GetResponse) error
+       grpc.ServerStream
+}
+
+type ephsGetServer struct {
+       grpc.ServerStream
+}
+
+func (x *ephsGetServer) Send(m *GetResponse) error {
+       return x.ServerStream.SendMsg(m)
+}
+
+func _Ephs_Put_Handler(srv interface{}, stream grpc.ServerStream) error {
+       return srv.(EphsServer).Put(&ephsPutServer{stream})
+}
+
+type Ephs_PutServer interface {
+       SendAndClose(*StatResponse) error
+       Recv() (*PutRequest, error)
+       grpc.ServerStream
+}
+
+type ephsPutServer struct {
+       grpc.ServerStream
+}
+
+func (x *ephsPutServer) SendAndClose(m *StatResponse) error {
+       return x.ServerStream.SendMsg(m)
+}
+
+func (x *ephsPutServer) Recv() (*PutRequest, error) {
+       m := new(PutRequest)
+       if err := x.ServerStream.RecvMsg(m); err != nil {
+               return nil, err
+       }
+       return m, nil
+}
+
+func _Ephs_Stat_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+       in := new(GetRequest)
+       if err := dec(in); err != nil {
+               return nil, err
+       }
+       if interceptor == nil {
+               return srv.(EphsServer).Stat(ctx, in)
+       }
+       info := &grpc.UnaryServerInfo{
+               Server:     srv,
+               FullMethod: Ephs_Stat_FullMethodName,
+       }
+       handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+               return srv.(EphsServer).Stat(ctx, req.(*GetRequest))
+       }
+       return interceptor(ctx, in, info, handler)
+}
+
+func _Ephs_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
+       m := new(GetRequest)
+       if err := stream.RecvMsg(m); err != nil {
+               return err
+       }
+       return srv.(EphsServer).Watch(m, &ephsWatchServer{stream})
+}
+
+type Ephs_WatchServer interface {
+       Send(*WatchResponse) error
+       grpc.ServerStream
+}
+
+type ephsWatchServer struct {
+       grpc.ServerStream
+}
+
+func (x *ephsWatchServer) Send(m *WatchResponse) error {
+       return x.ServerStream.SendMsg(m)
+}
+
+func _Ephs_MkDir_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+       in := new(MkDirRequest)
+       if err := dec(in); err != nil {
+               return nil, err
+       }
+       if interceptor == nil {
+               return srv.(EphsServer).MkDir(ctx, in)
+       }
+       info := &grpc.UnaryServerInfo{
+               Server:     srv,
+               FullMethod: Ephs_MkDir_FullMethodName,
+       }
+       handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+               return srv.(EphsServer).MkDir(ctx, req.(*MkDirRequest))
+       }
+       return interceptor(ctx, in, info, handler)
+}
+
+func _Ephs_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+       in := new(DeleteRequest)
+       if err := dec(in); err != nil {
+               return nil, err
+       }
+       if interceptor == nil {
+               return srv.(EphsServer).Delete(ctx, in)
+       }
+       info := &grpc.UnaryServerInfo{
+               Server:     srv,
+               FullMethod: Ephs_Delete_FullMethodName,
+       }
+       handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+               return srv.(EphsServer).Delete(ctx, req.(*DeleteRequest))
+       }
+       return interceptor(ctx, in, info, handler)
+}
+
+// Ephs_ServiceDesc is the grpc.ServiceDesc for Ephs service.
+// It's only intended for direct use with grpc.RegisterService,
+// and not to be introspected or modified (even as a copy)
+var Ephs_ServiceDesc = grpc.ServiceDesc{
+       ServiceName: "fuhry.runtime.service.ephs.Ephs",
+       HandlerType: (*EphsServer)(nil),
+       Methods: []grpc.MethodDesc{
+               {
+                       MethodName: "Stat",
+                       Handler:    _Ephs_Stat_Handler,
+               },
+               {
+                       MethodName: "MkDir",
+                       Handler:    _Ephs_MkDir_Handler,
+               },
+               {
+                       MethodName: "Delete",
+                       Handler:    _Ephs_Delete_Handler,
+               },
+       },
+       Streams: []grpc.StreamDesc{
+               {
+                       StreamName:    "Get",
+                       Handler:       _Ephs_Get_Handler,
+                       ServerStreams: true,
+               },
+               {
+                       StreamName:    "Put",
+                       Handler:       _Ephs_Put_Handler,
+                       ClientStreams: true,
+               },
+               {
+                       StreamName:    "Watch",
+                       Handler:       _Ephs_Watch_Handler,
+                       ServerStreams: true,
+               },
+       },
+       Metadata: "service.proto",
+}
diff --git a/proto/service/ephs/types.pb.go b/proto/service/ephs/types.pb.go
new file mode 100644 (file)
index 0000000..c7210eb
--- /dev/null
@@ -0,0 +1,545 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+//     protoc-gen-go v1.36.10
+//     protoc        v6.32.1
+// source: types.proto
+
+package ephs
+
+import (
+       protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+       protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+       timestamppb "google.golang.org/protobuf/types/known/timestamppb"
+       reflect "reflect"
+       sync "sync"
+       unsafe "unsafe"
+)
+
+const (
+       // Verify that this generated code is sufficiently up-to-date.
+       _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+       // Verify that runtime/protoimpl is sufficiently up-to-date.
+       _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type FsEntry_File_Compression int32
+
+const (
+       FsEntry_File_UNCOMPRESSED FsEntry_File_Compression = 0
+       FsEntry_File_GZIP         FsEntry_File_Compression = 1
+)
+
+// Enum value maps for FsEntry_File_Compression.
+var (
+       FsEntry_File_Compression_name = map[int32]string{
+               0: "UNCOMPRESSED",
+               1: "GZIP",
+       }
+       FsEntry_File_Compression_value = map[string]int32{
+               "UNCOMPRESSED": 0,
+               "GZIP":         1,
+       }
+)
+
+func (x FsEntry_File_Compression) Enum() *FsEntry_File_Compression {
+       p := new(FsEntry_File_Compression)
+       *p = x
+       return p
+}
+
+func (x FsEntry_File_Compression) String() string {
+       return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (FsEntry_File_Compression) Descriptor() protoreflect.EnumDescriptor {
+       return file_types_proto_enumTypes[0].Descriptor()
+}
+
+func (FsEntry_File_Compression) Type() protoreflect.EnumType {
+       return &file_types_proto_enumTypes[0]
+}
+
+func (x FsEntry_File_Compression) Number() protoreflect.EnumNumber {
+       return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use FsEntry_File_Compression.Descriptor instead.
+func (FsEntry_File_Compression) EnumDescriptor() ([]byte, []int) {
+       return file_types_proto_rawDescGZIP(), []int{0, 1, 0}
+}
+
+type FsEntry struct {
+       state         protoimpl.MessageState `protogen:"open.v1"`
+       Created       *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=created,proto3" json:"created,omitempty"`
+       Modified      *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=modified,proto3" json:"modified,omitempty"`
+       Version       uint64                 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"`
+       Size          uint64                 `protobuf:"varint,4,opt,name=size,proto3" json:"size,omitempty"`
+       Owner         string                 `protobuf:"bytes,5,opt,name=owner,proto3" json:"owner,omitempty"`
+       Content       *FsEntry_Content       `protobuf:"bytes,6,opt,name=content,proto3" json:"content,omitempty"`
+       unknownFields protoimpl.UnknownFields
+       sizeCache     protoimpl.SizeCache
+}
+
+func (x *FsEntry) Reset() {
+       *x = FsEntry{}
+       mi := &file_types_proto_msgTypes[0]
+       ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+       ms.StoreMessageInfo(mi)
+}
+
+func (x *FsEntry) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FsEntry) ProtoMessage() {}
+
+func (x *FsEntry) ProtoReflect() protoreflect.Message {
+       mi := &file_types_proto_msgTypes[0]
+       if x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use FsEntry.ProtoReflect.Descriptor instead.
+func (*FsEntry) Descriptor() ([]byte, []int) {
+       return file_types_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *FsEntry) GetCreated() *timestamppb.Timestamp {
+       if x != nil {
+               return x.Created
+       }
+       return nil
+}
+
+func (x *FsEntry) GetModified() *timestamppb.Timestamp {
+       if x != nil {
+               return x.Modified
+       }
+       return nil
+}
+
+func (x *FsEntry) GetVersion() uint64 {
+       if x != nil {
+               return x.Version
+       }
+       return 0
+}
+
+func (x *FsEntry) GetSize() uint64 {
+       if x != nil {
+               return x.Size
+       }
+       return 0
+}
+
+func (x *FsEntry) GetOwner() string {
+       if x != nil {
+               return x.Owner
+       }
+       return ""
+}
+
+func (x *FsEntry) GetContent() *FsEntry_Content {
+       if x != nil {
+               return x.Content
+       }
+       return nil
+}
+
+type FsEntry_Directory struct {
+       state         protoimpl.MessageState              `protogen:"open.v1"`
+       Entries       []*FsEntry_Directory_DirectoryEntry `protobuf:"bytes,1,rep,name=entries,proto3" json:"entries,omitempty"`
+       unknownFields protoimpl.UnknownFields
+       sizeCache     protoimpl.SizeCache
+}
+
+func (x *FsEntry_Directory) Reset() {
+       *x = FsEntry_Directory{}
+       mi := &file_types_proto_msgTypes[1]
+       ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+       ms.StoreMessageInfo(mi)
+}
+
+func (x *FsEntry_Directory) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FsEntry_Directory) ProtoMessage() {}
+
+func (x *FsEntry_Directory) ProtoReflect() protoreflect.Message {
+       mi := &file_types_proto_msgTypes[1]
+       if x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use FsEntry_Directory.ProtoReflect.Descriptor instead.
+func (*FsEntry_Directory) Descriptor() ([]byte, []int) {
+       return file_types_proto_rawDescGZIP(), []int{0, 0}
+}
+
+func (x *FsEntry_Directory) GetEntries() []*FsEntry_Directory_DirectoryEntry {
+       if x != nil {
+               return x.Entries
+       }
+       return nil
+}
+
+type FsEntry_File struct {
+       state         protoimpl.MessageState   `protogen:"open.v1"`
+       Content       []byte                   `protobuf:"bytes,1,opt,name=content,proto3" json:"content,omitempty"`
+       Compression   FsEntry_File_Compression `protobuf:"varint,2,opt,name=compression,proto3,enum=fuhry.runtime.service.ephs.FsEntry_File_Compression" json:"compression,omitempty"`
+       unknownFields protoimpl.UnknownFields
+       sizeCache     protoimpl.SizeCache
+}
+
+func (x *FsEntry_File) Reset() {
+       *x = FsEntry_File{}
+       mi := &file_types_proto_msgTypes[2]
+       ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+       ms.StoreMessageInfo(mi)
+}
+
+func (x *FsEntry_File) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FsEntry_File) ProtoMessage() {}
+
+func (x *FsEntry_File) ProtoReflect() protoreflect.Message {
+       mi := &file_types_proto_msgTypes[2]
+       if x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use FsEntry_File.ProtoReflect.Descriptor instead.
+func (*FsEntry_File) Descriptor() ([]byte, []int) {
+       return file_types_proto_rawDescGZIP(), []int{0, 1}
+}
+
+func (x *FsEntry_File) GetContent() []byte {
+       if x != nil {
+               return x.Content
+       }
+       return nil
+}
+
+func (x *FsEntry_File) GetCompression() FsEntry_File_Compression {
+       if x != nil {
+               return x.Compression
+       }
+       return FsEntry_File_UNCOMPRESSED
+}
+
+type FsEntry_LargeFile struct {
+       state         protoimpl.MessageState `protogen:"open.v1"`
+       Key           string                 `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
+       unknownFields protoimpl.UnknownFields
+       sizeCache     protoimpl.SizeCache
+}
+
+func (x *FsEntry_LargeFile) Reset() {
+       *x = FsEntry_LargeFile{}
+       mi := &file_types_proto_msgTypes[3]
+       ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+       ms.StoreMessageInfo(mi)
+}
+
+func (x *FsEntry_LargeFile) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FsEntry_LargeFile) ProtoMessage() {}
+
+func (x *FsEntry_LargeFile) ProtoReflect() protoreflect.Message {
+       mi := &file_types_proto_msgTypes[3]
+       if x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use FsEntry_LargeFile.ProtoReflect.Descriptor instead.
+func (*FsEntry_LargeFile) Descriptor() ([]byte, []int) {
+       return file_types_proto_rawDescGZIP(), []int{0, 2}
+}
+
+func (x *FsEntry_LargeFile) GetKey() string {
+       if x != nil {
+               return x.Key
+       }
+       return ""
+}
+
+type FsEntry_Content struct {
+       state protoimpl.MessageState `protogen:"open.v1"`
+       // Types that are valid to be assigned to Content:
+       //
+       //      *FsEntry_Content_Directory
+       //      *FsEntry_Content_File
+       //      *FsEntry_Content_LargeFile
+       Content       isFsEntry_Content_Content `protobuf_oneof:"content"`
+       unknownFields protoimpl.UnknownFields
+       sizeCache     protoimpl.SizeCache
+}
+
+func (x *FsEntry_Content) Reset() {
+       *x = FsEntry_Content{}
+       mi := &file_types_proto_msgTypes[4]
+       ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+       ms.StoreMessageInfo(mi)
+}
+
+func (x *FsEntry_Content) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FsEntry_Content) ProtoMessage() {}
+
+func (x *FsEntry_Content) ProtoReflect() protoreflect.Message {
+       mi := &file_types_proto_msgTypes[4]
+       if x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use FsEntry_Content.ProtoReflect.Descriptor instead.
+func (*FsEntry_Content) Descriptor() ([]byte, []int) {
+       return file_types_proto_rawDescGZIP(), []int{0, 3}
+}
+
+func (x *FsEntry_Content) GetContent() isFsEntry_Content_Content {
+       if x != nil {
+               return x.Content
+       }
+       return nil
+}
+
+func (x *FsEntry_Content) GetDirectory() *FsEntry_Directory {
+       if x != nil {
+               if x, ok := x.Content.(*FsEntry_Content_Directory); ok {
+                       return x.Directory
+               }
+       }
+       return nil
+}
+
+func (x *FsEntry_Content) GetFile() *FsEntry_File {
+       if x != nil {
+               if x, ok := x.Content.(*FsEntry_Content_File); ok {
+                       return x.File
+               }
+       }
+       return nil
+}
+
+func (x *FsEntry_Content) GetLargeFile() *FsEntry_LargeFile {
+       if x != nil {
+               if x, ok := x.Content.(*FsEntry_Content_LargeFile); ok {
+                       return x.LargeFile
+               }
+       }
+       return nil
+}
+
+type isFsEntry_Content_Content interface {
+       isFsEntry_Content_Content()
+}
+
+type FsEntry_Content_Directory struct {
+       Directory *FsEntry_Directory `protobuf:"bytes,1,opt,name=directory,proto3,oneof"`
+}
+
+type FsEntry_Content_File struct {
+       File *FsEntry_File `protobuf:"bytes,2,opt,name=file,proto3,oneof"`
+}
+
+type FsEntry_Content_LargeFile struct {
+       LargeFile *FsEntry_LargeFile `protobuf:"bytes,3,opt,name=large_file,json=largeFile,proto3,oneof"`
+}
+
+func (*FsEntry_Content_Directory) isFsEntry_Content_Content() {}
+
+func (*FsEntry_Content_File) isFsEntry_Content_Content() {}
+
+func (*FsEntry_Content_LargeFile) isFsEntry_Content_Content() {}
+
+type FsEntry_Directory_DirectoryEntry struct {
+       state         protoimpl.MessageState `protogen:"open.v1"`
+       Name          string                 `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+       Directory     bool                   `protobuf:"varint,2,opt,name=directory,proto3" json:"directory,omitempty"`
+       unknownFields protoimpl.UnknownFields
+       sizeCache     protoimpl.SizeCache
+}
+
+func (x *FsEntry_Directory_DirectoryEntry) Reset() {
+       *x = FsEntry_Directory_DirectoryEntry{}
+       mi := &file_types_proto_msgTypes[5]
+       ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+       ms.StoreMessageInfo(mi)
+}
+
+func (x *FsEntry_Directory_DirectoryEntry) String() string {
+       return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FsEntry_Directory_DirectoryEntry) ProtoMessage() {}
+
+func (x *FsEntry_Directory_DirectoryEntry) ProtoReflect() protoreflect.Message {
+       mi := &file_types_proto_msgTypes[5]
+       if x != nil {
+               ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+               if ms.LoadMessageInfo() == nil {
+                       ms.StoreMessageInfo(mi)
+               }
+               return ms
+       }
+       return mi.MessageOf(x)
+}
+
+// Deprecated: Use FsEntry_Directory_DirectoryEntry.ProtoReflect.Descriptor instead.
+func (*FsEntry_Directory_DirectoryEntry) Descriptor() ([]byte, []int) {
+       return file_types_proto_rawDescGZIP(), []int{0, 0, 0}
+}
+
+func (x *FsEntry_Directory_DirectoryEntry) GetName() string {
+       if x != nil {
+               return x.Name
+       }
+       return ""
+}
+
+func (x *FsEntry_Directory_DirectoryEntry) GetDirectory() bool {
+       if x != nil {
+               return x.Directory
+       }
+       return false
+}
+
+var File_types_proto protoreflect.FileDescriptor
+
+const file_types_proto_rawDesc = "" +
+       "\n" +
+       "\vtypes.proto\x12\x1afuhry.runtime.service.ephs\x1a\x1fgoogle/protobuf/timestamp.proto\"\xe7\x06\n" +
+       "\aFsEntry\x124\n" +
+       "\acreated\x18\x01 \x01(\v2\x1a.google.protobuf.TimestampR\acreated\x126\n" +
+       "\bmodified\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\bmodified\x12\x18\n" +
+       "\aversion\x18\x03 \x01(\x04R\aversion\x12\x12\n" +
+       "\x04size\x18\x04 \x01(\x04R\x04size\x12\x14\n" +
+       "\x05owner\x18\x05 \x01(\tR\x05owner\x12E\n" +
+       "\acontent\x18\x06 \x01(\v2+.fuhry.runtime.service.ephs.FsEntry.ContentR\acontent\x1a\xa7\x01\n" +
+       "\tDirectory\x12V\n" +
+       "\aentries\x18\x01 \x03(\v2<.fuhry.runtime.service.ephs.FsEntry.Directory.DirectoryEntryR\aentries\x1aB\n" +
+       "\x0eDirectoryEntry\x12\x12\n" +
+       "\x04name\x18\x01 \x01(\tR\x04name\x12\x1c\n" +
+       "\tdirectory\x18\x02 \x01(\bR\tdirectory\x1a\xa3\x01\n" +
+       "\x04File\x12\x18\n" +
+       "\acontent\x18\x01 \x01(\fR\acontent\x12V\n" +
+       "\vcompression\x18\x02 \x01(\x0e24.fuhry.runtime.service.ephs.FsEntry.File.CompressionR\vcompression\")\n" +
+       "\vCompression\x12\x10\n" +
+       "\fUNCOMPRESSED\x10\x00\x12\b\n" +
+       "\x04GZIP\x10\x01\x1a\x1d\n" +
+       "\tLargeFile\x12\x10\n" +
+       "\x03key\x18\x01 \x01(\tR\x03key\x1a\xf3\x01\n" +
+       "\aContent\x12M\n" +
+       "\tdirectory\x18\x01 \x01(\v2-.fuhry.runtime.service.ephs.FsEntry.DirectoryH\x00R\tdirectory\x12>\n" +
+       "\x04file\x18\x02 \x01(\v2(.fuhry.runtime.service.ephs.FsEntry.FileH\x00R\x04file\x12N\n" +
+       "\n" +
+       "large_file\x18\x03 \x01(\v2-.fuhry.runtime.service.ephs.FsEntry.LargeFileH\x00R\tlargeFileB\t\n" +
+       "\acontentB)Z'go.fuhry.dev/runtime/proto/service/ephsb\x06proto3"
+
+var (
+       file_types_proto_rawDescOnce sync.Once
+       file_types_proto_rawDescData []byte
+)
+
+func file_types_proto_rawDescGZIP() []byte {
+       file_types_proto_rawDescOnce.Do(func() {
+               file_types_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_types_proto_rawDesc), len(file_types_proto_rawDesc)))
+       })
+       return file_types_proto_rawDescData
+}
+
+var file_types_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
+var file_types_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
+var file_types_proto_goTypes = []any{
+       (FsEntry_File_Compression)(0),            // 0: fuhry.runtime.service.ephs.FsEntry.File.Compression
+       (*FsEntry)(nil),                          // 1: fuhry.runtime.service.ephs.FsEntry
+       (*FsEntry_Directory)(nil),                // 2: fuhry.runtime.service.ephs.FsEntry.Directory
+       (*FsEntry_File)(nil),                     // 3: fuhry.runtime.service.ephs.FsEntry.File
+       (*FsEntry_LargeFile)(nil),                // 4: fuhry.runtime.service.ephs.FsEntry.LargeFile
+       (*FsEntry_Content)(nil),                  // 5: fuhry.runtime.service.ephs.FsEntry.Content
+       (*FsEntry_Directory_DirectoryEntry)(nil), // 6: fuhry.runtime.service.ephs.FsEntry.Directory.DirectoryEntry
+       (*timestamppb.Timestamp)(nil),            // 7: google.protobuf.Timestamp
+}
+var file_types_proto_depIdxs = []int32{
+       7, // 0: fuhry.runtime.service.ephs.FsEntry.created:type_name -> google.protobuf.Timestamp
+       7, // 1: fuhry.runtime.service.ephs.FsEntry.modified:type_name -> google.protobuf.Timestamp
+       5, // 2: fuhry.runtime.service.ephs.FsEntry.content:type_name -> fuhry.runtime.service.ephs.FsEntry.Content
+       6, // 3: fuhry.runtime.service.ephs.FsEntry.Directory.entries:type_name -> fuhry.runtime.service.ephs.FsEntry.Directory.DirectoryEntry
+       0, // 4: fuhry.runtime.service.ephs.FsEntry.File.compression:type_name -> fuhry.runtime.service.ephs.FsEntry.File.Compression
+       2, // 5: fuhry.runtime.service.ephs.FsEntry.Content.directory:type_name -> fuhry.runtime.service.ephs.FsEntry.Directory
+       3, // 6: fuhry.runtime.service.ephs.FsEntry.Content.file:type_name -> fuhry.runtime.service.ephs.FsEntry.File
+       4, // 7: fuhry.runtime.service.ephs.FsEntry.Content.large_file:type_name -> fuhry.runtime.service.ephs.FsEntry.LargeFile
+       8, // [8:8] is the sub-list for method output_type
+       8, // [8:8] is the sub-list for method input_type
+       8, // [8:8] is the sub-list for extension type_name
+       8, // [8:8] is the sub-list for extension extendee
+       0, // [0:8] is the sub-list for field type_name
+}
+
+func init() { file_types_proto_init() }
+func file_types_proto_init() {
+       if File_types_proto != nil {
+               return
+       }
+       file_types_proto_msgTypes[4].OneofWrappers = []any{
+               (*FsEntry_Content_Directory)(nil),
+               (*FsEntry_Content_File)(nil),
+               (*FsEntry_Content_LargeFile)(nil),
+       }
+       type x struct{}
+       out := protoimpl.TypeBuilder{
+               File: protoimpl.DescBuilder{
+                       GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+                       RawDescriptor: unsafe.Slice(unsafe.StringData(file_types_proto_rawDesc), len(file_types_proto_rawDesc)),
+                       NumEnums:      1,
+                       NumMessages:   6,
+                       NumExtensions: 0,
+                       NumServices:   0,
+               },
+               GoTypes:           file_types_proto_goTypes,
+               DependencyIndexes: file_types_proto_depIdxs,
+               EnumInfos:         file_types_proto_enumTypes,
+               MessageInfos:      file_types_proto_msgTypes,
+       }.Build()
+       File_types_proto = out.File
+       file_types_proto_goTypes = nil
+       file_types_proto_depIdxs = nil
+}
diff --git a/proto/service/ephs/types.proto b/proto/service/ephs/types.proto
new file mode 100644 (file)
index 0000000..b5d011b
--- /dev/null
@@ -0,0 +1,45 @@
+syntax = "proto3";
+
+import "google/protobuf/timestamp.proto";
+
+option go_package = "go.fuhry.dev/runtime/proto/service/ephs";
+
+package fuhry.runtime.service.ephs;
+
+message FsEntry {
+    message Directory {
+        message DirectoryEntry {
+            string name = 1;
+            bool directory = 2;
+        }
+        repeated DirectoryEntry entries = 1;
+    }
+
+    message File {
+        enum Compression {
+            UNCOMPRESSED = 0;
+            GZIP = 1;
+        }
+        bytes content = 1;
+        Compression compression = 2;
+    }
+
+    message LargeFile {
+        string key = 1;
+    }
+
+    message Content {
+        oneof content {
+            Directory directory = 1;
+            File file = 2;
+            LargeFile large_file = 3;
+        }
+    }
+
+    google.protobuf.Timestamp created = 1;
+    google.protobuf.Timestamp modified = 2;
+    uint64 version = 3;
+    uint64 size = 4;
+    string owner = 5;
+    Content content = 6;
+}
diff --git a/utils/stringmatch/serialization2.go b/utils/stringmatch/serialization2.go
new file mode 100644 (file)
index 0000000..a2bb20b
--- /dev/null
@@ -0,0 +1,114 @@
+package stringmatch
+
+import (
+       "errors"
+       "fmt"
+)
+
+type NewSyntaxMatchRule struct {
+       Prefix *struct {
+               V string `yaml:"prefix"`
+       } `yaml:",inline"`
+       Suffix *struct {
+               V string `yaml:"suffix"`
+       } `yaml:",inline"`
+       Exact *struct {
+               V string `yaml:"exact"`
+       } `yaml:",inline"`
+       Contains *struct {
+               V string `yaml:"contains"`
+       } `yaml:",inline"`
+       Regexp *struct {
+               V string `yaml:"regexp"`
+       } `yaml:",inline"`
+       Any *struct {
+               V bool `yaml:"any"`
+       } `yaml:",inline"`
+       Never *struct {
+               V bool `yaml:"never"`
+       } `yaml:",inline"`
+       And *struct {
+               V []*NewSyntaxMatchRule `yaml:"and"`
+       } `yaml:",inline"`
+       Or *struct {
+               V []*NewSyntaxMatchRule `yaml:"or"`
+       } `yaml:",inline"`
+}
+
+func (m *NewSyntaxMatchRule) Matcher() (StringMatcher, error) {
+       var out StringMatcher
+       count := 0
+
+       if m.Prefix != nil {
+               out = Prefix(m.Prefix.V)
+               count++
+       }
+       if m.Suffix != nil {
+               out = Suffix(m.Suffix.V)
+               count++
+       }
+       if m.Exact != nil {
+               out = Exact(m.Exact.V)
+               count++
+       }
+       if m.Contains != nil {
+               out = Contains(m.Contains.V)
+               count++
+       }
+       if m.Regexp != nil {
+               out = Regexp(m.Regexp.V)
+               count++
+       }
+       if m.Any != nil {
+               if m.Any.V != true {
+                       return nil, errors.New("any may only be true")
+               }
+               out = Any{}
+               count++
+       }
+       if m.Never != nil && m.Never.V {
+               if m.Never.V != true {
+                       return nil, errors.New("never may only be true")
+               }
+               out = Never{}
+               count++
+       }
+       if m.And != nil && len(m.And.V) > 0 {
+               count++
+               out = &andMatcher{}
+               for i, m := range m.And.V {
+                       mat, err := m.Matcher()
+                       if err != nil {
+                               return nil, fmt.Errorf("and-rule at index %d: %v", i, err)
+                       }
+                       out.(*andMatcher).matchers = append(out.(*andMatcher).matchers, mat)
+               }
+       }
+       if m.Or != nil && len(m.Or.V) > 0 {
+               count++
+               out = &orMatcher{}
+               for i, m := range m.Or.V {
+                       mat, err := m.Matcher()
+                       if err != nil {
+                               return nil, fmt.Errorf("or-rule at index %d: %v", i, err)
+                       }
+                       out.(*orMatcher).matchers = append(out.(*orMatcher).matchers, mat)
+               }
+       }
+
+       if count != 1 {
+               return nil, fmt.Errorf(
+                       "match rule must contain exactly one of: prefix, suffix, exact, " +
+                               "contains, regexp, any, never, and, or")
+       }
+
+       return out, nil
+}
+
+func (m *NewSyntaxMatchRule) Match(v string) bool {
+       mat, err := m.Matcher()
+       if err != nil {
+               panic(err)
+       }
+       return mat.Match(v)
+}
diff --git a/utils/stringmatch/serialization2_test.go b/utils/stringmatch/serialization2_test.go
new file mode 100644 (file)
index 0000000..3b5c140
--- /dev/null
@@ -0,0 +1,88 @@
+package stringmatch
+
+import (
+       "bytes"
+       "fmt"
+       "os"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "gopkg.in/yaml.v3"
+)
+
+type ptr[T any] interface {
+       *T
+}
+
+func mustDecode[T any, PT ptr[T]](t *testing.T, str string) PT {
+       out := new(T)
+       buf := bytes.NewBufferString(str)
+       decoder := yaml.NewDecoder(buf)
+       if err := decoder.Decode(out); err != nil {
+               fmt.Fprintf(os.Stderr, "error decoding yaml %q: %v", str, err)
+               t.FailNow()
+       }
+
+       return out
+}
+
+func TestNSMExact(t *testing.T) {
+       m := mustDecode[NewSyntaxMatchRule](t, "exact: meh")
+       assert.True(t, m.Match("meh"))
+}
+
+func TestNSMPrefix(t *testing.T) {
+       m := mustDecode[NewSyntaxMatchRule](t, "prefix: meh")
+       assert.True(t, m.Match("mehh"))
+       assert.False(t, m.Match("ehh"))
+}
+
+func TestNSMSuffix(t *testing.T) {
+       m := mustDecode[NewSyntaxMatchRule](t, "suffix: eh")
+       assert.True(t, m.Match("meh"))
+       assert.False(t, m.Match("ehh"))
+}
+
+func TestNSMContains(t *testing.T) {
+       m := mustDecode[NewSyntaxMatchRule](t, "contains: eh")
+       assert.True(t, m.Match("meh"))
+       assert.False(t, m.Match("uh"))
+}
+
+func TestNSMRegexp(t *testing.T) {
+       m := mustDecode[NewSyntaxMatchRule](t, "regexp: ^meh$")
+       assert.True(t, m.Match("meh"))
+       assert.False(t, m.Match("ehh"))
+}
+
+func TestNSMAnd(t *testing.T) {
+       const rules = `and:
+  - prefix: f
+  - suffix: oo
+`
+       m := mustDecode[NewSyntaxMatchRule](t, rules)
+       assert.True(t, m.Match("foo"))
+       assert.False(t, m.Match("boo"))
+}
+
+func TestNSMOr(t *testing.T) {
+       const rules = `or:
+  - prefix: f
+  - suffix: oo
+`
+       m := mustDecode[NewSyntaxMatchRule](t, rules)
+       assert.True(t, m.Match("foo"))
+       assert.True(t, m.Match("boo"))
+       assert.False(t, m.Match("bar"))
+}
+
+func TestNSMErrors(t *testing.T) {
+       _, err := mustDecode[NewSyntaxMatchRule](t, "startswith: invalid").Matcher()
+       assert.Error(t, err)
+
+       _, err = mustDecode[NewSyntaxMatchRule](t, "contains: a\nprefix: b").Matcher()
+       assert.Error(t, err)
+
+       _, err = mustDecode[NewSyntaxMatchRule](t, "any: false\nprefix: b").Matcher()
+       assert.Error(t, err)
+}