From 205b24ba86f02f273a80bda8efa24fefe3f90933 Mon Sep 17 00:00:00 2001 From: Dan Fuhry Date: Thu, 6 Nov 2025 06:53:00 -0500 Subject: [PATCH] Add ephs ("ephemeral filesystem") --- config_watcher/backend_ephs.go | 65 +++ config_watcher/backend_local.go | 94 +++ config_watcher/watcher.go | 24 + ephs/client.go | 467 +++++++++++++++ ephs/servicer/acl.go | 109 ++++ ephs/servicer/acl_test.go | 103 ++++ ephs/servicer/fs_object.go | 204 +++++++ ephs/servicer/s3.go | 49 ++ ephs/servicer/servicer.go | 709 +++++++++++++++++++++++ ephs/servicer/writer.go | 67 +++ proto/service/ephs/Makefile | 14 + proto/service/ephs/service.pb.go | 559 ++++++++++++++++++ proto/service/ephs/service.proto | 57 ++ proto/service/ephs/service_grpc.pb.go | 384 ++++++++++++ proto/service/ephs/types.pb.go | 545 +++++++++++++++++ proto/service/ephs/types.proto | 45 ++ utils/stringmatch/serialization2.go | 114 ++++ utils/stringmatch/serialization2_test.go | 88 +++ 18 files changed, 3697 insertions(+) create mode 100644 config_watcher/backend_ephs.go create mode 100644 config_watcher/backend_local.go create mode 100644 config_watcher/watcher.go create mode 100644 ephs/client.go create mode 100644 ephs/servicer/acl.go create mode 100644 ephs/servicer/acl_test.go create mode 100644 ephs/servicer/fs_object.go create mode 100644 ephs/servicer/s3.go create mode 100644 ephs/servicer/servicer.go create mode 100644 ephs/servicer/writer.go create mode 100644 proto/service/ephs/Makefile create mode 100644 proto/service/ephs/service.pb.go create mode 100644 proto/service/ephs/service.proto create mode 100644 proto/service/ephs/service_grpc.pb.go create mode 100644 proto/service/ephs/types.pb.go create mode 100644 proto/service/ephs/types.proto create mode 100644 utils/stringmatch/serialization2.go create mode 100644 utils/stringmatch/serialization2_test.go diff --git a/config_watcher/backend_ephs.go b/config_watcher/backend_ephs.go new file mode 100644 index 0000000..7d9d088 --- /dev/null +++ b/config_watcher/backend_ephs.go @@ -0,0 +1,65 @@ +package config_watcher + +import ( + "context" + "io" + + "go.fuhry.dev/runtime/ephs" +) + +type ephsConfigWatcher struct { + f string + client ephs.Client + ctx context.Context + cancel context.CancelFunc + ch <-chan *ephs.WatchResponse +} + +var _ ConfigWatcher = &ephsConfigWatcher{} + +func (w *ephsConfigWatcher) Next() (*Update, error) { + select { + case <-w.ch: + reader, err := w.client.Get(w.f) + if err != nil { + return nil, err + } + + contents, err := io.ReadAll(reader) + if err != nil { + return nil, err + } + + return &Update{ + Contents: contents, + }, nil + case <-w.ctx.Done(): + return nil, w.ctx.Err() + } +} + +func (w *ephsConfigWatcher) Close() error { + w.cancel() + return nil +} + +func newEphsConfigWatcher(ctx context.Context, filePath string) (*ephsConfigWatcher, error) { + client, err := ephs.DefaultClient() + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(ctx) + + watchCh, err := client.Watch(ctx, filePath) + + w := &ephsConfigWatcher{ + f: filePath, + client: client, + ctx: ctx, + cancel: cancel, + ch: watchCh, + } + + return w, nil +} diff --git a/config_watcher/backend_local.go b/config_watcher/backend_local.go new file mode 100644 index 0000000..5449f97 --- /dev/null +++ b/config_watcher/backend_local.go @@ -0,0 +1,94 @@ +package config_watcher + +import ( + "fmt" + "io" + "os" + "sync" + + "go.fuhry.dev/runtime/mtls/fsnotify" + "go.fuhry.dev/runtime/utils/log" +) + +type localConfigWatcher struct { + f string + written bool + mu sync.Mutex + ch chan bool + log log.Logger +} + +func (w *localConfigWatcher) Next() (*Update, error) { + if v := <-w.ch; !v { + return nil, io.EOF + } + + contents, err := os.ReadFile(w.f) + if err != nil { + return nil, err + } + return &Update{ + Contents: contents, + }, nil +} + +func (w *localConfigWatcher) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + + if err := fsnotify.Unsubscribe(w.f); err != nil { + return err + } + + close(w.ch) + w.ch = nil + return nil +} + +func (w *localConfigWatcher) notif(filePath string, op fsnotify.Op) { + w.mu.Lock() + defer w.mu.Unlock() + + w.log.V(1).Debugf("event(path=%s, op=%v)", filePath, op) + + if w.ch == nil { + return + } + + if filePath != w.f { + return + } + + if op.Has(fsnotify.Write) { + w.written = true + } + if w.written && op.Has(fsnotify.Close) { + w.written = false + w.ch <- true + } +} + +var _ ConfigWatcher = &localConfigWatcher{} + +func newLocalConfigWatcher(filePath string) (*localConfigWatcher, error) { + filePath = fsnotify.RealPath(filePath) + + w := &localConfigWatcher{ + f: filePath, + ch: make(chan bool), + log: log.Default().WithPrefix(fmt.Sprintf("localConfigWatcher(%s)", filePath)), + } + + err := fsnotify.NotifyPath(filePath, w.notif) + if err != nil { + close(w.ch) + return nil, err + } + + // send first update immediately + go (func() { + w.ch <- true + })() + + return w, nil +} diff --git a/config_watcher/watcher.go b/config_watcher/watcher.go new file mode 100644 index 0000000..d719738 --- /dev/null +++ b/config_watcher/watcher.go @@ -0,0 +1,24 @@ +package config_watcher + +import ( + "context" + + "go.fuhry.dev/runtime/ephs" +) + +type ConfigWatcher interface { + Next() (*Update, error) + Close() error +} + +type Update struct { + Contents []byte +} + +func Watch(ctx context.Context, filePath string) (ConfigWatcher, error) { + if ephs.IsEphsPath(filePath) { + return newEphsConfigWatcher(ctx, filePath) + } + + return newLocalConfigWatcher(filePath) +} diff --git a/ephs/client.go b/ephs/client.go new file mode 100644 index 0000000..baba8bf --- /dev/null +++ b/ephs/client.go @@ -0,0 +1,467 @@ +package ephs + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "math" + "strings" + "sync" + "time" + + "github.com/quic-go/quic-go" + "go.fuhry.dev/runtime/grpc" + "go.fuhry.dev/runtime/mtls" + ephs_pb "go.fuhry.dev/runtime/proto/service/ephs" + "go.fuhry.dev/runtime/utils/log" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const KeyPrefix = "/ephs/" +const ChunkSize = 262144 + +const formatEntryDateFormat = "Monday, 2 Jan 2006 15:04:05 -0700" + +type WatchResponse = ephs_pb.WatchResponse + +type Client interface { + Get(path string) (io.Reader, error) + GetContext(ctx context.Context, path string) (io.ReadCloser, error) + + Stat(path string) (*ephs_pb.FsEntry, error) + StatContext(ctx context.Context, path string) (*ephs_pb.FsEntry, error) + + Put(path string, size uint64, content io.Reader) (*ephs_pb.FsEntry, error) + PutContext(ctx context.Context, path string, size uint64, content io.Reader) (*ephs_pb.FsEntry, error) + + Delete(path string, recursive bool) error + DeleteContext(ctx context.Context, path string, recursive bool) error + + MkDir(path string, recursive bool) error + MkDirContext(ctx context.Context, path string, recursive bool) error + + Watch(ctx context.Context, path string) (<-chan *ephs_pb.WatchResponse, error) +} + +type ClientOption interface { + Apply(*clientImpl) error +} + +type genericClientOption struct { + apply func(*clientImpl) error +} + +type clientImpl struct { + defaultCtx context.Context + defaultTimeout time.Duration + id mtls.Identity + + client grpc.Client + conn *grpc.ClientConn +} + +type getReader struct { + stream ephs_pb.Ephs_GetClient + buf *bytes.Buffer + cancel context.CancelFunc +} + +var DefaultClientContext context.Context + +var ephsQuicConfig = &quic.Config{ + HandshakeIdleTimeout: 5 * time.Second, + MaxIdleTimeout: 120 * time.Minute, + KeepAlivePeriod: 5 * time.Second, +} + +var defaultClient Client +var defaultClientMu sync.Mutex + +func IsEphsPath(p string) bool { + return strings.HasPrefix(p, KeyPrefix) +} + +func FormatFsEntry(e *ephs_pb.FsEntry) string { + if e == nil { + return "" + } + + var out string + + out += fmt.Sprintf("Owner: %s\n", e.Owner) + out += fmt.Sprintf("Version: %d\n", e.Version) + out += fmt.Sprintf("Created: %s\n", e.Created.AsTime().Format(formatEntryDateFormat)) + out += fmt.Sprintf("Modified: %s\n", e.Modified.AsTime().Format(formatEntryDateFormat)) + + if e.Content.GetDirectory() != nil { + children := len(e.Content.GetDirectory().GetEntries()) + out += fmt.Sprintf("Children: %d\n", children) + if children > 0 { + out += strings.Repeat("-", 70) + "\nEntries:\n" + for _, child := range e.Content.GetDirectory().GetEntries() { + dirMarker := " " + if child.Directory { + dirMarker = "[d] " + } + + out += dirMarker + child.Name + if child.Directory { + out += "/" + } + out += "\n" + } + } + } else { + out += fmt.Sprintf("Size: %d bytes (%s)\n", e.Size, humanFilesize(e.Size)) + } + + if f := e.Content.GetFile(); f != nil { + out += "Storage: inline\n" + out += fmt.Sprintf("Compression: %s\n", f.Compression.String()) + out += fmt.Sprintf("Compressed size: %d bytes (%s)\n", len(f.Content), humanFilesize(uint64(len(f.Content)))) + } else if f := e.Content.GetLargeFile(); f != nil { + out += "Storage: S3\n" + out += fmt.Sprintf("Key: %s\n", f.Key) + } + + return out +} + +func (o *genericClientOption) Apply(c *clientImpl) error { + return o.apply(c) +} + +func WithDefaultTimeout(d time.Duration) ClientOption { + return &genericClientOption{ + apply: func(c *clientImpl) error { + c.defaultTimeout = d + return nil + }, + } +} + +func DefaultClient() (Client, error) { + defaultClientMu.Lock() + defer defaultClientMu.Unlock() + + if defaultClient != nil { + return defaultClient, nil + } + + if DefaultClientContext == nil { + DefaultClientContext = context.Background() + } + + client, err := NewClient(DefaultClientContext, mtls.DefaultIdentity()) + if err != nil { + return nil, err + } + + defaultClient = client + return defaultClient, nil +} + +func NewClient(ctx context.Context, localId mtls.Identity, opts ...ClientOption) (Client, error) { + cl := &clientImpl{ + defaultCtx: ctx, + defaultTimeout: 15 * time.Second, + id: localId, + } + + for _, opt := range opts { + if err := opt.Apply(cl); err != nil { + return nil, err + } + } + + return cl, nil +} + +func (c *clientImpl) Get(path string) (io.Reader, error) { + ctx, cancel := context.WithTimeout(c.defaultCtx, c.defaultTimeout) + + reader, err := c.GetContext(ctx, path) + if err != nil { + return nil, err + } + + reader.(*getReader).cancel = cancel + + return reader, nil +} + +func (c *clientImpl) GetContext(ctx context.Context, path string) (io.ReadCloser, error) { + rpc, err := c.grpcClient() + if err != nil { + return nil, err + } + req := &ephs_pb.GetRequest{Path: path} + stream, err := rpc.Get(ctx, req) + if err != nil { + return nil, err + } + + return &getReader{stream, &bytes.Buffer{}, nil}, nil +} + +func (r *getReader) Read(p []byte) (int, error) { + if r.buf.Len() < 1 { + msg, err := r.stream.Recv() + if err != nil { + return 0, err + } + + if len(msg.Chunk) == 0 { + return 0, io.EOF + } + + r.buf.Write(msg.Chunk) + } + + return r.buf.Read(p) +} + +func (r *getReader) Close() error { + if r.cancel != nil { + r.cancel() + } + return r.stream.CloseSend() +} + +func (c *clientImpl) Stat(path string) (*ephs_pb.FsEntry, error) { + ctx, cancel := context.WithTimeout(c.defaultCtx, c.defaultTimeout) + defer cancel() + + return c.StatContext(ctx, path) +} + +func (c *clientImpl) StatContext(ctx context.Context, path string) (*ephs_pb.FsEntry, error) { + rpc, err := c.grpcClient() + if err != nil { + return nil, err + } + + req := &ephs_pb.GetRequest{ + Path: path, + } + resp, err := rpc.Stat(ctx, req) + if err != nil { + return nil, err + } + + return resp.GetEntry(), nil +} + +func (c *clientImpl) Delete(path string, recursive bool) error { + ctx, cancel := context.WithTimeout(c.defaultCtx, c.defaultTimeout) + defer cancel() + + return c.DeleteContext(ctx, path, recursive) +} + +func (c *clientImpl) DeleteContext(ctx context.Context, path string, recursive bool) error { + rpc, err := c.grpcClient() + if err != nil { + return err + } + + req := &ephs_pb.DeleteRequest{ + Path: path, + Recursive: recursive, + } + _, err = rpc.Delete(ctx, req) + return err +} + +func (c *clientImpl) MkDir(path string, recursive bool) error { + ctx, cancel := context.WithTimeout(c.defaultCtx, c.defaultTimeout) + defer cancel() + + return c.MkDirContext(ctx, path, recursive) +} + +func (c *clientImpl) MkDirContext(ctx context.Context, path string, recursive bool) error { + rpc, err := c.grpcClient() + if err != nil { + return err + } + + req := &ephs_pb.MkDirRequest{ + Path: path, + Recursive: recursive, + } + _, err = rpc.MkDir(ctx, req) + return err +} + +func (c *clientImpl) Put(path string, size uint64, r io.Reader) (*ephs_pb.FsEntry, error) { + ctx, cancel := context.WithTimeout(c.defaultCtx, c.defaultTimeout) + defer cancel() + + return c.PutContext(ctx, path, size, r) +} + +func (c *clientImpl) PutContext(ctx context.Context, path string, size uint64, r io.Reader) (*ephs_pb.FsEntry, error) { + rpc, err := c.grpcClient() + if err != nil { + return nil, err + } + + req := &ephs_pb.PutRequest{ + Path: path, + Size: size, + Version: 0, + Chunk: make([]byte, min(size, ChunkSize)), + } + stream, err := rpc.Put(ctx) + if err != nil { + return nil, err + } + + for nw := uint64(0); nw < size; { + n, err := r.Read(req.Chunk) + if n == 0 || err == io.EOF { + break + } + if err != nil { + stream.CloseSend() + return nil, err + } + req.Chunk = req.Chunk[:n] + err = stream.Send(req) + if err != nil { + return nil, err + } + nw += uint64(n) + if err == io.EOF { + break + } + } + + response, err := stream.CloseAndRecv() + if err != nil && err != io.EOF { + return nil, err + } + + if response != nil && response.GetEntry() != nil { + return response.GetEntry(), nil + } else { + return nil, errors.New("upload succeeded but received nil response") + } +} + +func (c *clientImpl) Watch(ctx context.Context, path string) (<-chan *ephs_pb.WatchResponse, error) { + rpc, err := c.grpcClient() + if err != nil { + return nil, err + } + + req := &ephs_pb.GetRequest{ + Path: path, + } + stream, err := rpc.Watch(ctx, req) + if err != nil { + return nil, err + } + + ch := make(chan *ephs_pb.WatchResponse) + + go c.watch(ctx, path, stream, ch) + + return ch, nil +} + +func (c *clientImpl) watch(origCtx context.Context, origPath string, stream ephs_pb.Ephs_WatchClient, ch chan *ephs_pb.WatchResponse) { + defer close(ch) + + for { + msg, err := stream.Recv() + if err != nil { + if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable { + stream.CloseSend() + + if rpc, err := c.grpcClient(); err == nil { + stream, err = rpc.Watch(origCtx, &ephs_pb.GetRequest{Path: origPath}) + if err == nil { + log.Default().Noticef("reconnected watcher with new stream") + continue + } else { + log.Default().Errorf("error reestablishing watch stream: %v", err) + } + } else { + log.Default().Errorf("error reconnecting: %v", err) + } + } + log.Default().Errorf("error receiving watch stream: %v", err) + return + } + + timer := time.NewTimer(c.defaultTimeout) + select { + case ch <- msg: + // do nothing - we've written to the channel and that's all we require + case <-timer.C: + return + } + } +} + +func (c *clientImpl) grpcClient() (ephs_pb.EphsClient, error) { + var err error + serverId := mtls.NewServiceIdentity("ephs") + + if c.client == nil { + c.client, err = grpc.NewGrpcClient(c.defaultCtx, serverId, c.id, + grpc.WithConnectionFactory(&grpc.QUICConnectionFactory{ + QUICConfig: ephsQuicConfig.Clone(), + })) + if err != nil { + return nil, fmt.Errorf("error creating grpc client: %T: %v", err, err) + } + } + + deadline, ok := c.defaultCtx.Deadline() + if !ok { + deadline = time.Now().Add(60 * time.Second) + } + var ( + conn *grpc.ClientConn + lastErr error + ) + + for time.Now().Before(deadline) { + conn, lastErr = c.client.Conn() + if lastErr == nil { + break + } + log.Default().Warningf("error establishing grpc connection to ephs server, retrying in 1s: %T: %v", lastErr, lastErr) + select { + case <-c.defaultCtx.Done(): + log.Default().Error(c.defaultCtx.Err()) + return nil, c.defaultCtx.Err() + case <-time.NewTimer(1 * time.Second).C: + } + } + if lastErr != nil { + return nil, lastErr + } + + confFsCl := ephs_pb.NewEphsClient(conn) + return confFsCl, nil +} + +func humanFilesize(s uint64) string { + if s > uint64(0.9*math.Pow(2, 40)) { + return fmt.Sprintf("%.2f TiB", float64(s)/math.Pow(2, 40)) + } else if s > uint64(0.9*math.Pow(2, 30)) { + return fmt.Sprintf("%.2f GiB", float64(s)/math.Pow(2, 30)) + } else if s > uint64(0.9*math.Pow(2, 20)) { + return fmt.Sprintf("%.2f MiB", float64(s)/math.Pow(2, 20)) + } else if s > uint64(0.9*math.Pow(2, 10)) { + return fmt.Sprintf("%.2f KiB", float64(s)/math.Pow(2, 10)) + } + return fmt.Sprintf("%d bytes", s) +} diff --git a/ephs/servicer/acl.go b/ephs/servicer/acl.go new file mode 100644 index 0000000..e9cf055 --- /dev/null +++ b/ephs/servicer/acl.go @@ -0,0 +1,109 @@ +package servicer + +import ( + "bytes" + "errors" + "fmt" + "os" + + "gopkg.in/yaml.v3" + + "go.fuhry.dev/runtime/utils/stringmatch" +) + +type AclRule struct { + Principal *stringmatch.NewSyntaxMatchRule `yaml:"principal"` + Key *stringmatch.NewSyntaxMatchRule `yaml:"key"` + Invert bool `yaml:"invert"` +} + +type Acl struct { + Rules []*AclRule `yaml:"rules"` +} + +func LoadAcl(filePath string) (*Acl, error) { + fp, err := os.OpenFile(filePath, os.O_RDONLY, os.FileMode(0)) + if err != nil { + return nil, err + } + defer fp.Close() + + acl := &Acl{} + decoder := yaml.NewDecoder(fp) + if err := decoder.Decode(acl); err != nil { + return nil, err + } + + if err := acl.Validate(); err != nil { + return nil, err + } + + return acl, nil +} + +func LoadAclString(text string) (*Acl, error) { + buf := bytes.NewBufferString(text) + acl := &Acl{} + decoder := yaml.NewDecoder(buf) + if err := decoder.Decode(acl); err != nil { + return nil, err + } + + if err := acl.Validate(); err != nil { + return nil, err + } + + return acl, nil +} + +func (r *AclRule) Match(principal, key string) bool { + if !r.Principal.Match(principal) { + return false + } + + keyMatcher, err := r.Key.Matcher() + if err != nil { + return false + } + + keyMatcher = keyMatcher.Sub(map[string]string{"principal": principal}) + + return keyMatcher.Match(key) +} + +func (a *Acl) Check(principal, key string) bool { + for _, r := range a.Rules { + if r.Match(principal, key) { + return !r.Invert + } + } + + return false +} + +func (a *Acl) Validate() error { + if len(a.Rules) < 1 { + return errors.New("access control list is empty") + } + + var errs []error + for i, rule := range a.Rules { + if rule.Key == nil { + errs = append(errs, fmt.Errorf("rule %d missing key", i)) + } else { + if _, err := rule.Key.Matcher(); err != nil { + errs = append(errs, fmt.Errorf("rule %d key: %v", i, err)) + } + } + + if rule.Principal == nil { + errs = append(errs, fmt.Errorf("rule %d missing principal", i)) + } else { + if _, err := rule.Principal.Matcher(); err != nil { + errs = append(errs, fmt.Errorf("rule %d principal: %v", i, err)) + } + } + } + + return errors.Join(errs...) +} diff --git a/ephs/servicer/acl_test.go b/ephs/servicer/acl_test.go new file mode 100644 index 0000000..d744c7a --- /dev/null +++ b/ephs/servicer/acl_test.go @@ -0,0 +1,103 @@ +package servicer + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +const testRules = ` +rules: + - principal: + exact: bob + key: + exact: /{{principal}}/can/write/this + - principal: + any: true + key: + exact: /writable/by/self/{{principal}} + - principal: + or: + - exact: bob + - exact: suzie + key: + prefix: /or/test/{{principal}}/ +` + +func TestAcl(t *testing.T) { + type testCase struct { + description, princ, key string + expect bool + } + + acl, err := LoadAclString(testRules) + assert.NoError(t, err) + + testCases := []testCase{ + { + "bob has access to /bob/can/write/this", + "bob", + "/bob/can/write/this", + true, + }, + { + "bob does not have access to /suzie/can/write/this", + "bob", + "/suzie/can/write/this", + false, + }, + { + "suzie does not have access to /suzie/can/write/this", + "suzie", + "/suzie/can/write/this", + false, + }, + { + "bob has access to /writable/by/self/bob", + "bob", + "/writable/by/self/bob", + true, + }, + { + "suzie has access to /writable/by/self/suzie", + "suzie", + "/writable/by/self/suzie", + true, + }, + { + "bob has access to /writable/by/self/suzie", + "suzie", + "/writable/by/self/suzie", + true, + }, + { + "bob has access to /or/test/bob/foo", + "bob", + "/or/test/bob/foo", + true, + }, + { + "suzie has access to /or/test/suzie/foo", + "suzie", + "/or/test/suzie/foo", + true, + }, + { + "frank does not have access to /or/test/frank/foo", + "frank", + "/or/test/frank/foo", + false, + }, + { + "bob does not have access /no/rule/for/this", + "bob", + "/no/rule/for/this", + false, + }, + } + + for i, tc := range testCases { + assert.Equal(t, tc.expect, acl.Check(tc.princ, tc.key), + "test case %d: %s", i, tc.description) + } +} diff --git a/ephs/servicer/fs_object.go b/ephs/servicer/fs_object.go new file mode 100644 index 0000000..94df6c2 --- /dev/null +++ b/ephs/servicer/fs_object.go @@ -0,0 +1,204 @@ +package servicer + +import ( + "bytes" + "compress/gzip" + "context" + "errors" + "fmt" + "io" + "math/rand" + "regexp" + + "github.com/minio/minio-go/v7" + "go.fuhry.dev/runtime/ephs" + ephs_proto "go.fuhry.dev/runtime/proto/service/ephs" + + "go.fuhry.dev/runtime/utils/log" +) + +const LargeFileThreshold = ephs.ChunkSize + +type FsEntry struct { + *ephs_proto.FsEntry +} + +var ErrDirectory = errors.New("is a directory") +var ErrUnknownCompression = errors.New("file compressed with unknown scheme") + +var RegexpLargeFileKey = regexp.MustCompile(`^[0-9A-Za-z_-]{64}$`) + +func (e *FsEntry) GetContents(ctx context.Context, s3 *minio.Client) (io.Reader, error) { + switch { + case e.FsEntry.Content.GetDirectory() != nil: + return nil, ErrDirectory + case e.FsEntry.Content.GetFile() != nil: + return e.readFile() + case e.FsEntry.Content.GetLargeFile() != nil: + return e.readLargeFile(ctx, s3) + default: + return nil, errors.New("file content not populated with any known type") + } +} + +func (e *FsEntry) SetContents(ctx context.Context, s3 *minio.Client, r io.Reader, size uint64) error { + if e.FsEntry.Content.GetDirectory() != nil { + return ErrDirectory + } + + var oldLargeFile ephs_proto.FsEntry_LargeFile + mustCleanupLargeFile := e.FsEntry.Content.GetLargeFile() != nil + if mustCleanupLargeFile { + oldLargeFile = *e.FsEntry.Content.GetLargeFile() + } + + var err error + if size >= LargeFileThreshold { + err = e.writeLargeFile(ctx, s3, r, size) + } else { + err = e.writeFile(r, size) + } + if err != nil { + return err + } + + if mustCleanupLargeFile { + log.Default().Infof("mustCleanupLargeFile: %+v", oldLargeFile) + + err = e.cleanupOldLargeFile(ctx, s3, &oldLargeFile) + if err != nil { + log.Default().Alert( + "error while cleaning up old large file with key %q: %v", + oldLargeFile.Key, + err) + } + } + + return nil +} + +func (e *FsEntry) DeleteContents(ctx context.Context, s3 *minio.Client) error { + switch { + case e.FsEntry.Content.GetDirectory() != nil: + return ErrDirectory + case e.FsEntry.Content.GetFile() != nil: + e.FsEntry.Content = nil + return nil + case e.FsEntry.Content.GetLargeFile() != nil: + return e.cleanupOldLargeFile(ctx, s3, e.Content.GetLargeFile()) + default: + return errors.New("file content not populated with any known type") + } +} + +func (e *FsEntry) readFile() (io.Reader, error) { + file := e.FsEntry.Content.GetFile() + if file == nil { + return nil, errors.New("internal error: readFile() called but file is unpopulated") + } + + switch file.Compression { + case ephs_proto.FsEntry_File_UNCOMPRESSED: + return bytes.NewBuffer(file.GetContent()), nil + case ephs_proto.FsEntry_File_GZIP: + return gzip.NewReader(bytes.NewBuffer(file.GetContent())) + default: + return nil, ErrUnknownCompression + } +} + +func (e *FsEntry) readLargeFile(ctx context.Context, s3 *minio.Client) (io.Reader, error) { + if e.Content == nil || e.Content.GetLargeFile() == nil { + return nil, errors.New("not a large file") + } + + key := e.Content.GetLargeFile().GetKey() + if !RegexpLargeFileKey.MatchString(key) { + return nil, errors.New("large file key contains invalid characters or is the wrong length") + } + + return s3.GetObject(ctx, s3Bucket, s3Prefix+key, minio.GetObjectOptions{}) +} + +func (e *FsEntry) writeFile(r io.Reader, size uint64) error { + var compression = ephs_proto.FsEntry_File_GZIP + + if f := e.FsEntry.Content.GetFile(); f != nil { + compression = f.Compression + } + + f := &ephs_proto.FsEntry_File{ + Compression: compression, + } + + switch compression { + case ephs_proto.FsEntry_File_UNCOMPRESSED: + var err error + f.Content, err = io.ReadAll(r) + if err != nil { + return err + } + if uint64(len(f.Content)) != size { + return fmt.Errorf("underrun writing file: expected %d bytes, wrote %d", size, len(f.Content)) + } + case ephs_proto.FsEntry_File_GZIP: + buf := &bytes.Buffer{} + gz := gzip.NewWriter(buf) + nw, _ := io.Copy(gz, r) + gz.Close() + f.Content = append(f.Content, buf.Bytes()...) + + if nw != int64(size) { + return fmt.Errorf("underrun writing file: expected %d bytes, wrote %d", size, nw) + } + } + + e.FsEntry.Content = &ephs_proto.FsEntry_Content{ + Content: &ephs_proto.FsEntry_Content_File{ + File: f, + }, + } + + return nil +} + +func (e *FsEntry) writeLargeFile(ctx context.Context, s3 *minio.Client, r io.Reader, size uint64) error { + key := newLargeFileKey() + + log.Default().Noticef("attempting to PutObject into bucket %s at key %s", s3Bucket, key) + upload, err := s3.PutObject(ctx, s3Bucket, s3Prefix+key, r, int64(size), minio.PutObjectOptions{}) + if err != nil { + log.Default().Error(err) + return err + } + + log.Default().Infof("upload info: %+v", upload) + + e.FsEntry.Content = &ephs_proto.FsEntry_Content{ + Content: &ephs_proto.FsEntry_Content_LargeFile{ + LargeFile: &ephs_proto.FsEntry_LargeFile{ + Key: key, + }, + }, + } + + return nil +} + +func (e *FsEntry) cleanupOldLargeFile(ctx context.Context, s3 *minio.Client, lf *ephs_proto.FsEntry_LargeFile) error { + key := lf.GetKey() + if !RegexpLargeFileKey.MatchString(key) { + return errors.New("large file key contains invalid characters or is the wrong length") + } + + return s3.RemoveObject(ctx, s3Bucket, s3Prefix+key, minio.RemoveObjectOptions{}) +} + +func newLargeFileKey() string { + const charset = `abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-` + var out []byte + for i := 0; i < 64; i++ { + out = append(out, charset[rand.Intn(len(charset))]) + } + return string(out) +} diff --git a/ephs/servicer/s3.go b/ephs/servicer/s3.go new file mode 100644 index 0000000..caebdd3 --- /dev/null +++ b/ephs/servicer/s3.go @@ -0,0 +1,49 @@ +package servicer + +import ( + "errors" + "flag" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + "go.fuhry.dev/runtime/constants" +) + +var s3Endpoint = "s3." + constants.WebServicesDomain +var s3Bucket = "ephs" +var s3Prefix = "" + +func WithAWSEnvCredentials() Option { + return &genericOption{ + apply: func(s *ephsServicer) error { + s.s3Creds = credentials.NewEnvAWS() + return nil + }, + } +} + +func WithAWSCredentialFile(filename string) Option { + return &genericOption{ + apply: func(s *ephsServicer) error { + s.s3Creds = credentials.NewFileAWSCredentials(filename, "default") + return nil + }, + } +} + +func (s *ephsServicer) newS3Client() (*minio.Client, error) { + if !flag.Parsed() { + return nil, errors.New("flags not yet parsed") + } + + return minio.New(s3Endpoint, &minio.Options{ + Creds: s.s3Creds, + Secure: true, + }) +} + +func init() { + flag.StringVar(&s3Endpoint, "ephs.s3-endpoint", s3Endpoint, "S3 endpoint") + flag.StringVar(&s3Bucket, "ephs.s3-bucket", s3Bucket, "S3 bucket name") + flag.StringVar(&s3Prefix, "ephs.s3-prefix", s3Prefix, "S3 object prefix") +} diff --git a/ephs/servicer/servicer.go b/ephs/servicer/servicer.go new file mode 100644 index 0000000..a95d72a --- /dev/null +++ b/ephs/servicer/servicer.go @@ -0,0 +1,709 @@ +package servicer + +import ( + "bytes" + "context" + "errors" + "flag" + "fmt" + "io" + "path" + "slices" + "strings" + "sync" + + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" + etcd_client "go.etcd.io/etcd/client/v3" + etcd_concurrency "go.etcd.io/etcd/client/v3/concurrency" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/timestamppb" + + "go.fuhry.dev/runtime/ephs" + "go.fuhry.dev/runtime/grpc" + "go.fuhry.dev/runtime/mtls" + "go.fuhry.dev/runtime/mtls/fsnotify" + ephs_proto "go.fuhry.dev/runtime/proto/service/ephs" + "go.fuhry.dev/runtime/sd" + "go.fuhry.dev/runtime/utils/hostname" + "go.fuhry.dev/runtime/utils/log" +) + +type Option interface { + Apply(*ephsServicer) error +} + +type genericOption struct { + apply func(*ephsServicer) error +} + +type msgWithPath interface { + GetPath() string +} + +type ephsServicer struct { + ephs_proto.EphsServer + + acl *Acl + logger log.Logger + clientMu sync.Mutex + clients map[string]*etcd_client.Client + s3Creds *credentials.Credentials + s3Client *minio.Client +} + +var cell string + +func (o *genericOption) Apply(servicer *ephsServicer) error { + return o.apply(servicer) +} + +func WithAcl(acl *Acl) Option { + return &genericOption{ + apply: func(s *ephsServicer) error { + s.acl = acl + return nil + }, + } +} + +func (s *ephsServicer) watchAcl(aclFile string) { + handle := func(filePath string, op fsnotify.Op) { + if filePath != aclFile { + return + } + if acl, err := LoadAcl(filePath); err == nil { + s.acl = acl + s.logger.Noticef("reloaded ACLs from %s", filePath) + } else { + s.logger.Warningf("error reloading ACLs from %s, rules not reloaded: %v", filePath, err) + } + } + fsnotify.NotifyPath(aclFile, handle) +} + +func WithAclFile(aclFile string) Option { + return &genericOption{ + apply: func(s *ephsServicer) error { + acl, err := LoadAcl(aclFile) + if err != nil { + return err + } + s.logger.Noticef("loaded %d ACL rules from %s", + len(acl.Rules), + aclFile) + s.acl = acl + s.watchAcl(aclFile) + return nil + }, + } +} + +func NewEphsServicer(opts ...Option) (ephs_proto.EphsServer, error) { + serv := &ephsServicer{ + clients: make(map[string]*etcd_client.Client), + logger: log.Default().WithPrefix("ephsServicer"), + } + if _, err := serv.clientForCell(cell); err != nil { + return nil, err + } + for _, o := range opts { + if err := o.Apply(serv); err != nil { + return nil, err + } + } + + s3, err := serv.newS3Client() + if err != nil { + return nil, err + } + serv.s3Client = s3 + + return serv, nil +} + +func (s *ephsServicer) clientForCell(cell string) (*etcd_client.Client, error) { + s.clientMu.Lock() + defer s.clientMu.Unlock() + + if _, ok := s.clients[cell]; !ok { + client, err := sd.NewEtcdClient( + mtls.DefaultIdentity(), + cell, + ) + if err != nil { + return nil, err + } + s.clients[cell] = client + } + + return s.clients[cell], nil +} + +func (s *ephsServicer) checkPath(ctx context.Context, path string) (string, error) { + if !ephs.IsEphsPath(path) { + return "", status.Errorf(codes.InvalidArgument, "path %q must start with "+ephs.KeyPrefix, path) + } + + parts := strings.Split(path, "/") + + if len(parts) < 3 { + return "", status.Errorf( + codes.InvalidArgument, + "path must be at least 3 levels deep: %q", + path) + } + + for i, part := range parts { + if (i > 0 && part == "") || part == "." || part == ".." { + return "", status.Errorf(codes.InvalidArgument, + "path %q is invalid: may not contain empty elements, '.' or '..'", path) + } + } + + ident, err := identityFromContext(ctx) + if err != nil { + return "", err + } + + aclPath := strings.Join(parts[2:], "/") + + if s.acl != nil { + if !s.acl.Check(ident, aclPath) { + return "", status.Errorf( + codes.PermissionDenied, + "access to path %q denied for %q", + aclPath, + ident) + } + } + + if parts[2] == "local" { + parts[2] = cell + } + + return strings.Join(parts, "/"), nil +} + +func (s *ephsServicer) Stat(ctx context.Context, req *ephs_proto.GetRequest) (*ephs_proto.StatResponse, error) { + s.logRequest(ctx, "Stat", req) + entry, err := s.getPath(ctx, req.Path) + if err != nil { + return nil, err + } + + return &ephs_proto.StatResponse{ + Entry: entry, + }, nil +} + +func (s *ephsServicer) getPath(ctx context.Context, path string) (*ephs_proto.FsEntry, error) { + var err error + if path, err = s.checkPath(ctx, path); err != nil { + return nil, err + } + + cell := strings.Split(path, "/")[2] + etcd, err := s.clientForCell(cell) + if err != nil { + return nil, err + } + + obj, err := etcd.Get(ctx, path) + if err != nil { + return nil, err + } + + if len(obj.Kvs) < 1 { + return nil, status.Errorf( + codes.NotFound, + "object does not exist: %q", + path) + } + + entry := &ephs_proto.FsEntry{} + if err := proto.Unmarshal(obj.Kvs[0].Value, entry); err != nil { + return nil, status.Errorf( + codes.DataLoss, + "failed to unmarshal key %q as %s: %v", + obj.Kvs[0].Key, + proto.MessageName(entry), + err) + } + + return entry, nil +} + +func (s *ephsServicer) modifyDirectory(ctx context.Context, path string, apply func(*ephs_proto.FsEntry_Directory, etcd_concurrency.STM) error) error { + var err error + if path, err = s.checkPath(ctx, path); err != nil { + return err + } + + cell := strings.Split(path, "/")[2] + etcd, err := s.clientForCell(cell) + if err != nil { + return err + } + + txnResp, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error { + dirRaw := []byte(stm.Get(path)) + dir := newDir(ctx) + if len(dirRaw) > 0 { + if err := proto.Unmarshal(dirRaw, dir); err != nil { + return fmt.Errorf("error unmarshaling FsEntry at %q: %v", path, err) + } + } + + if dir.Content.GetDirectory() == nil { + return fmt.Errorf("ephs path %q: not a directory", path) + } + + if err = apply(dir.Content.GetDirectory(), stm); err != nil { + return fmt.Errorf("error performing atomic directory modification: %v", err) + } + + dir.Modified = timestamppb.Now() + dir.Version += 1 + + if dirRaw, err = proto.Marshal(dir); err != nil { + return fmt.Errorf("error re-marshaling FsEntry at %q after calling apply: %v", path, err) + } + + stm.Put(path, string(dirRaw)) + + return nil + }) + + s.logger.Debugf("transaction status modifying directory %q: %+v", path, txnResp) + + return err +} + +func (s *ephsServicer) logRequest(ctx context.Context, method string, req msgWithPath) { + id, err := identityFromContext(ctx) + if err != nil { + return + } + + s.logger.AppendPrefix("]["+id).Infof("%s(%q)", method, req.GetPath()) +} + +func (s *ephsServicer) Get(req *ephs_proto.GetRequest, server ephs_proto.Ephs_GetServer) error { + ctx := server.Context() + s.logRequest(ctx, "Get", req) + + entry, err := s.getPath(ctx, req.Path) + if err != nil { + return err + } + + if entry.Content.GetDirectory() != nil { + return status.Errorf( + codes.FailedPrecondition, + "%q: is a directory", + req.Path) + } + + entryCopy := *entry + + response := &ephs_proto.GetResponse{ + Entry: &entryCopy, + } + + response.Entry.Content = nil + + ent := &FsEntry{entry} + reader, err := ent.GetContents(ctx, s.s3Client) + if err != nil { + return err + } + + writer := &responseWriter{server: server, response: response} + n, err := io.Copy(writer, reader) + if err != nil { + s.logger.Error(err) + return err + } + s.logger.V(1).Noticef("Get(%q): wrote %d bytes", req.Path, n) + + return nil +} + +func (s *ephsServicer) mkdir(ctx context.Context, pathName string, recursive bool) error { + parts := strings.Split(pathName, "/") + + cell := parts[2] + etcd, err := s.clientForCell(cell) + if err != nil { + return err + } + + txn, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error { + var ( + dir *ephs_proto.FsEntry = newDir(ctx) + err error + ) + for i := 3; i < len(parts); i++ { + parent := strings.Join(parts[:i], "/") + + dirRaw := []byte(stm.Get(parent)) + if len(dirRaw) == 0 { + if !recursive { + return status.Errorf(codes.NotFound, "cannot mkdir %q: parent directory %q does not exist and recursion is disabled", pathName, parent) + } + + dir = newDir(ctx) + } else { + err = proto.Unmarshal(dirRaw, dir) + if err != nil { + return status.Errorf(codes.Internal, + "error unmarshaling parent directory %q: %T: %v", + parent, err, err) + } + + if dir.Content.GetDirectory() == nil { + return status.Errorf( + codes.FailedPrecondition, + "cannot mkdir %q: parent item %q is not a directory (raw len: %d)", + pathName, parent, len(dirRaw)) + } + + dir.Modified = timestamppb.Now() + dir.Version += 1 + } + + dEnt := dir.Content.GetDirectory() + + dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{ + Name: parts[i], + Directory: true, + }) + + dirRaw, err = proto.Marshal(dir) + if err != nil { + return status.Errorf(codes.Internal, + "error marshaling parent directory %q: %T: %v", + parent, err, err) + } + + stm.Put(parent, string(dirRaw)) + } + + entry := []byte(stm.Get(pathName)) + if len(entry) > 0 { + return status.Errorf(codes.AlreadyExists, + "cannot create directory %q: already exists", + pathName) + } + + dirRaw, err := proto.Marshal(newDir(ctx)) + if err != nil { + return status.Errorf(codes.Internal, + "error marshaling new directory %q: %T: %v", + pathName, err, err) + } + + stm.Put(pathName, string(dirRaw)) + return nil + }) + if err != nil { + return err + } + if !txn.Succeeded { + return fmt.Errorf("transaction failed: %+v", txn) + } + + return nil +} + +func (s *ephsServicer) MkDir(ctx context.Context, req *ephs_proto.MkDirRequest) (*emptypb.Empty, error) { + path, err := s.checkPath(ctx, req.Path) + if err != nil { + return nil, err + } + + return &emptypb.Empty{}, s.mkdir(ctx, path, req.Recursive) +} + +func (s *ephsServicer) Delete(ctx context.Context, req *ephs_proto.DeleteRequest) (*emptypb.Empty, error) { + s.logRequest(ctx, "Delete", req) + targetPath, err := s.checkPath(ctx, req.Path) + if err != nil { + return nil, err + } + + entry, err := s.getPath(ctx, targetPath) + if err != nil { + return nil, err + } + + fse := &FsEntry{entry} + if fse.Content.GetDirectory() != nil { + if len(fse.Content.GetDirectory().Entries) > 0 && !req.Recursive { + return nil, status.Errorf(codes.FailedPrecondition, + "refusing to delete non-empty directory %q without recursion flag set", + targetPath) + } + + for _, ent := range fse.Content.GetDirectory().Entries { + subReq := &ephs_proto.DeleteRequest{ + Path: path.Join(targetPath, ent.Name), + Recursive: true, + } + _, err = s.Delete(ctx, subReq) + if err != nil { + return nil, err + } + } + } else { + if err := fse.DeleteContents(ctx, s.s3Client); err != nil { + return nil, err + } + } + + dir := path.Dir(targetPath) + err = s.modifyDirectory(ctx, dir, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error { + index := -1 + for i, ent := range dEnt.Entries { + if ent.Name == path.Base(targetPath) { + index = i + break + } + } + if index >= 0 { + dEnt.Entries = slices.Delete(dEnt.Entries, index, index+1) + } + stm.Del(targetPath) + return nil + }) + if err != nil { + return nil, err + } + + return &emptypb.Empty{}, nil +} + +func (s *ephsServicer) Put(server ephs_proto.Ephs_PutServer) error { + msg, err := server.Recv() + if err != nil { + return err + } + + ctx := server.Context() + s.logRequest(ctx, "Put", msg) + + peerId, err := identityFromContext(ctx) + if err != nil { + return err + } + + targetPath, err := s.checkPath(ctx, msg.Path) + if err != nil { + return err + } + + dir := path.Dir(targetPath) + _, err = s.getPath(ctx, dir) + if err != nil { + if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound { + return status.Errorf( + codes.FailedPrecondition, + "cannot write %q: parent directory %q does not exist", + targetPath, dir) + } + return err + } + + pbEnt, err := s.getPath(ctx, targetPath) + if err != nil { + if status, ok := status.FromError(err); ok && status.Code() == codes.NotFound { + // NotFound is only returned after all access checks have succeeded. Ok to + // create file. + now := timestamppb.Now() + pbEnt = &ephs_proto.FsEntry{ + Created: now, + Modified: now, + Version: msg.Version - 1, + Size: msg.Size, + Owner: peerId, + } + } else { + // some other error besides not-found + return err + } + } else { + if msg.Version != 0 && msg.Version != (pbEnt.Version+1) { + return status.Errorf( + codes.FailedPrecondition, + "version conflict: attempted to overwrite version %d with %d, "+ + "version number must be current version + 1", + pbEnt.Version, + msg.Version) + } + } + ent := &FsEntry{pbEnt} + if pbEnt.Content.GetDirectory() != nil { + return status.Errorf(codes.FailedPrecondition, + "cannot write %q: path already exists and is a directory", + targetPath) + } + + reader := &putReader{ + initialMsg: msg, + server: server, + buf: &bytes.Buffer{}, + } + + if err := ent.SetContents(ctx, s.s3Client, reader, msg.Size); err != nil { + return err + } + + ent.FsEntry.Modified = timestamppb.Now() + ent.FsEntry.Size = msg.Size + ent.FsEntry.Version++ + + marshaled, err := proto.Marshal(ent.FsEntry) + if err != nil { + return err + } + + err = s.modifyDirectory(ctx, dir, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error { + stm.Put(targetPath, string(marshaled)) + base := path.Base(targetPath) + for _, ent := range dEnt.Entries { + if ent.Name == base { + return nil + } + } + + dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{ + Name: base, + Directory: false, + }) + + return nil + }) + + if err != nil { + _ = ent.DeleteContents(ctx, s.s3Client) + return err + } + + response := &ephs_proto.StatResponse{ + Entry: ent.FsEntry, + } + return server.SendAndClose(response) +} + +func (s *ephsServicer) Watch(req *ephs_proto.GetRequest, server ephs_proto.Ephs_WatchServer) error { + ctx := etcd_client.WithRequireLeader(server.Context()) + s.logRequest(ctx, "Watch", req) + path, err := s.checkPath(ctx, req.Path) + if err != nil { + return err + } + + response := &ephs_proto.WatchResponse{ + Entry: nil, + Event: ephs_proto.WatchResponse_DELETE, + } + + if resp, err := s.getPath(ctx, path); err == nil { + response.Entry = resp + response.Event = ephs_proto.WatchResponse_CREATE + } + + cell := strings.Split(path, "/")[2] + etcd, err := s.clientForCell(cell) + if err != nil { + return err + } + + if err := server.Send(response); err != nil { + return err + } + + for ctx.Err() == nil { + watcher := etcd.Watch(ctx, path) + for msg := range watcher { + s.logger.Debugf("watcher: got event: %+v", msg) + for _, event := range msg.Events { + if string(event.Kv.Key) != path { + continue + } + + response.Entry = &ephs_proto.FsEntry{} + + if event.IsCreate() { + response.Event = ephs_proto.WatchResponse_CREATE + } else if event.IsModify() { + response.Event = ephs_proto.WatchResponse_MODIFY + } else if event.Type == etcd_client.EventTypeDelete { + response.Entry = nil + response.Event = ephs_proto.WatchResponse_DELETE + } else { + continue + } + + if response.Entry != nil { + if err = proto.Unmarshal(event.Kv.Value, response.Entry); err != nil { + s.logger.Errorf("protobuf marshal error: %v", err) + return err + } + } + + if err := server.Send(response); err != nil { + s.logger.Errorf("stream send error: %v", err) + return err + } + } + } + } + + s.logger.Noticef("Watch(%q) ending: ctx err: %v", path, ctx.Err()) + + if errors.Is(ctx.Err(), context.Canceled) { + return status.Error(codes.Unavailable, "server is shutting down, please reconnect") + } + + return nil +} + +func identityFromContext(ctx context.Context) (string, error) { + peer, ok := peer.FromContext(ctx) + if !ok { + return "", status.Error(codes.Unauthenticated, "who are you??") + } + + ident, err := grpc.PeerIdentity(peer) + if err != nil { + return "", status.Errorf(codes.PermissionDenied, "cannot determine your identity from peer info: %v", peer.AuthInfo) + } + + return ident.String(), nil +} + +func newDir(ctx context.Context) *ephs_proto.FsEntry { + whoami, _ := identityFromContext(ctx) + + return &ephs_proto.FsEntry{ + Content: &ephs_proto.FsEntry_Content{ + Content: &ephs_proto.FsEntry_Content_Directory{ + Directory: &ephs_proto.FsEntry_Directory{}, + }, + }, + Owner: whoami, + Created: timestamppb.Now(), + Modified: timestamppb.Now(), + Version: 1, + } +} + +func init() { + flag.StringVar(&cell, "ephs.cell", hostname.DomainName(), "ephs cell") +} diff --git a/ephs/servicer/writer.go b/ephs/servicer/writer.go new file mode 100644 index 0000000..6cccc6c --- /dev/null +++ b/ephs/servicer/writer.go @@ -0,0 +1,67 @@ +package servicer + +import ( + "bytes" + "fmt" + + ephs_proto "go.fuhry.dev/runtime/proto/service/ephs" + "go.fuhry.dev/runtime/utils/log" +) + +type responseWriter struct { + server ephs_proto.Ephs_GetServer + response *ephs_proto.GetResponse +} + +func (w *responseWriter) Write(p []byte) (int, error) { + log.Default().V(3).Debugf("responseWriter: writing %d bytes", len(p)) + chunkLen := min(len(p), LargeFileThreshold) + + w.response.Chunk = make([]byte, chunkLen) + copy(w.response.Chunk, p) + + if err := w.server.Send(w.response); err != nil { + log.Default().Error(err) + return 0, err + } + + log.Default().V(2).Noticef("wrote %d bytes", chunkLen) + return chunkLen, nil +} + +type putReader struct { + initialMsg *ephs_proto.PutRequest + server ephs_proto.Ephs_PutServer + buf *bytes.Buffer +} + +func (r *putReader) Read(p []byte) (int, error) { + var msg *ephs_proto.PutRequest + if r.buf.Len() > 0 { + n, err := r.buf.Read(p) + return n, err + } else if r.initialMsg != nil { + msg = r.initialMsg + r.initialMsg = nil + } else { + var err error + msg, err = r.server.Recv() + if err != nil { + return 0, err + } + } + + nw, err := r.buf.Write(msg.Chunk) + if err != nil { + return 0, err + } + if nw != len(msg.Chunk) { + return 0, fmt.Errorf( + "overrun on bytes.Buffer putReader: expected to write %d bytes, "+ + "but only wrote %d", + len(r.initialMsg.Chunk), + nw) + } + + return r.buf.Read(p) +} diff --git a/proto/service/ephs/Makefile b/proto/service/ephs/Makefile new file mode 100644 index 0000000..501225c --- /dev/null +++ b/proto/service/ephs/Makefile @@ -0,0 +1,14 @@ +PROTO_SRCS := $(wildcard *.proto) +PROTO_GO_OUTPUT := $(PROTO_SRCS:.proto=.pb.go) $(PROTO_SRCS:.proto=_grpc.pb.go) + +$(PROTO_GO_OUTPUT): $(PROTO_SRCS) + protoc --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + $(PROTO_SRCS) + +pb_go: $(PROTO_GO_OUTPUT) + +all: pb_go + +clean: + rm -fv $(PROTO_GO_OUTPUT) diff --git a/proto/service/ephs/service.pb.go b/proto/service/ephs/service.pb.go new file mode 100644 index 0000000..7e7eb77 --- /dev/null +++ b/proto/service/ephs/service.pb.go @@ -0,0 +1,559 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.10 +// protoc v6.32.1 +// source: service.proto + +package ephs + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type WatchResponse_Event int32 + +const ( + WatchResponse_UNKNOWN WatchResponse_Event = 0 + WatchResponse_CREATE WatchResponse_Event = 1 + WatchResponse_MODIFY WatchResponse_Event = 2 + WatchResponse_DELETE WatchResponse_Event = 3 +) + +// Enum value maps for WatchResponse_Event. +var ( + WatchResponse_Event_name = map[int32]string{ + 0: "UNKNOWN", + 1: "CREATE", + 2: "MODIFY", + 3: "DELETE", + } + WatchResponse_Event_value = map[string]int32{ + "UNKNOWN": 0, + "CREATE": 1, + "MODIFY": 2, + "DELETE": 3, + } +) + +func (x WatchResponse_Event) Enum() *WatchResponse_Event { + p := new(WatchResponse_Event) + *p = x + return p +} + +func (x WatchResponse_Event) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (WatchResponse_Event) Descriptor() protoreflect.EnumDescriptor { + return file_service_proto_enumTypes[0].Descriptor() +} + +func (WatchResponse_Event) Type() protoreflect.EnumType { + return &file_service_proto_enumTypes[0] +} + +func (x WatchResponse_Event) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use WatchResponse_Event.Descriptor instead. +func (WatchResponse_Event) EnumDescriptor() ([]byte, []int) { + return file_service_proto_rawDescGZIP(), []int{6, 0} +} + +type GetRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetRequest) Reset() { + *x = GetRequest{} + mi := &file_service_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetRequest) ProtoMessage() {} + +func (x *GetRequest) ProtoReflect() protoreflect.Message { + mi := &file_service_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRequest.ProtoReflect.Descriptor instead. +func (*GetRequest) Descriptor() ([]byte, []int) { + return file_service_proto_rawDescGZIP(), []int{0} +} + +func (x *GetRequest) GetPath() string { + if x != nil { + return x.Path + } + return "" +} + +type DeleteRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` + Recursive bool `protobuf:"varint,2,opt,name=recursive,proto3" json:"recursive,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *DeleteRequest) Reset() { + *x = DeleteRequest{} + mi := &file_service_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DeleteRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DeleteRequest) ProtoMessage() {} + +func (x *DeleteRequest) ProtoReflect() protoreflect.Message { + mi := &file_service_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DeleteRequest.ProtoReflect.Descriptor instead. +func (*DeleteRequest) Descriptor() ([]byte, []int) { + return file_service_proto_rawDescGZIP(), []int{1} +} + +func (x *DeleteRequest) GetPath() string { + if x != nil { + return x.Path + } + return "" +} + +func (x *DeleteRequest) GetRecursive() bool { + if x != nil { + return x.Recursive + } + return false +} + +type MkDirRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` + Recursive bool `protobuf:"varint,2,opt,name=recursive,proto3" json:"recursive,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *MkDirRequest) Reset() { + *x = MkDirRequest{} + mi := &file_service_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *MkDirRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*MkDirRequest) ProtoMessage() {} + +func (x *MkDirRequest) ProtoReflect() protoreflect.Message { + mi := &file_service_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use MkDirRequest.ProtoReflect.Descriptor instead. +func (*MkDirRequest) Descriptor() ([]byte, []int) { + return file_service_proto_rawDescGZIP(), []int{2} +} + +func (x *MkDirRequest) GetPath() string { + if x != nil { + return x.Path + } + return "" +} + +func (x *MkDirRequest) GetRecursive() bool { + if x != nil { + return x.Recursive + } + return false +} + +type GetResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Entry *FsEntry `protobuf:"bytes,1,opt,name=entry,proto3" json:"entry,omitempty"` + Chunk []byte `protobuf:"bytes,2,opt,name=chunk,proto3" json:"chunk,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetResponse) Reset() { + *x = GetResponse{} + mi := &file_service_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetResponse) ProtoMessage() {} + +func (x *GetResponse) ProtoReflect() protoreflect.Message { + mi := &file_service_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetResponse.ProtoReflect.Descriptor instead. +func (*GetResponse) Descriptor() ([]byte, []int) { + return file_service_proto_rawDescGZIP(), []int{3} +} + +func (x *GetResponse) GetEntry() *FsEntry { + if x != nil { + return x.Entry + } + return nil +} + +func (x *GetResponse) GetChunk() []byte { + if x != nil { + return x.Chunk + } + return nil +} + +type StatResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Entry *FsEntry `protobuf:"bytes,1,opt,name=entry,proto3" json:"entry,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *StatResponse) Reset() { + *x = StatResponse{} + mi := &file_service_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *StatResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StatResponse) ProtoMessage() {} + +func (x *StatResponse) ProtoReflect() protoreflect.Message { + mi := &file_service_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StatResponse.ProtoReflect.Descriptor instead. +func (*StatResponse) Descriptor() ([]byte, []int) { + return file_service_proto_rawDescGZIP(), []int{4} +} + +func (x *StatResponse) GetEntry() *FsEntry { + if x != nil { + return x.Entry + } + return nil +} + +type PutRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` + Version uint64 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty"` + Size uint64 `protobuf:"varint,3,opt,name=size,proto3" json:"size,omitempty"` + Chunk []byte `protobuf:"bytes,4,opt,name=chunk,proto3" json:"chunk,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *PutRequest) Reset() { + *x = PutRequest{} + mi := &file_service_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *PutRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PutRequest) ProtoMessage() {} + +func (x *PutRequest) ProtoReflect() protoreflect.Message { + mi := &file_service_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PutRequest.ProtoReflect.Descriptor instead. +func (*PutRequest) Descriptor() ([]byte, []int) { + return file_service_proto_rawDescGZIP(), []int{5} +} + +func (x *PutRequest) GetPath() string { + if x != nil { + return x.Path + } + return "" +} + +func (x *PutRequest) GetVersion() uint64 { + if x != nil { + return x.Version + } + return 0 +} + +func (x *PutRequest) GetSize() uint64 { + if x != nil { + return x.Size + } + return 0 +} + +func (x *PutRequest) GetChunk() []byte { + if x != nil { + return x.Chunk + } + return nil +} + +type WatchResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Entry *FsEntry `protobuf:"bytes,1,opt,name=entry,proto3" json:"entry,omitempty"` + Event WatchResponse_Event `protobuf:"varint,2,opt,name=event,proto3,enum=fuhry.runtime.service.ephs.WatchResponse_Event" json:"event,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *WatchResponse) Reset() { + *x = WatchResponse{} + mi := &file_service_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *WatchResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WatchResponse) ProtoMessage() {} + +func (x *WatchResponse) ProtoReflect() protoreflect.Message { + mi := &file_service_proto_msgTypes[6] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WatchResponse.ProtoReflect.Descriptor instead. +func (*WatchResponse) Descriptor() ([]byte, []int) { + return file_service_proto_rawDescGZIP(), []int{6} +} + +func (x *WatchResponse) GetEntry() *FsEntry { + if x != nil { + return x.Entry + } + return nil +} + +func (x *WatchResponse) GetEvent() WatchResponse_Event { + if x != nil { + return x.Event + } + return WatchResponse_UNKNOWN +} + +var File_service_proto protoreflect.FileDescriptor + +const file_service_proto_rawDesc = "" + + "\n" + + "\rservice.proto\x12\x1afuhry.runtime.service.ephs\x1a\x1bgoogle/protobuf/empty.proto\x1a\vtypes.proto\" \n" + + "\n" + + "GetRequest\x12\x12\n" + + "\x04path\x18\x01 \x01(\tR\x04path\"A\n" + + "\rDeleteRequest\x12\x12\n" + + "\x04path\x18\x01 \x01(\tR\x04path\x12\x1c\n" + + "\trecursive\x18\x02 \x01(\bR\trecursive\"@\n" + + "\fMkDirRequest\x12\x12\n" + + "\x04path\x18\x01 \x01(\tR\x04path\x12\x1c\n" + + "\trecursive\x18\x02 \x01(\bR\trecursive\"^\n" + + "\vGetResponse\x129\n" + + "\x05entry\x18\x01 \x01(\v2#.fuhry.runtime.service.ephs.FsEntryR\x05entry\x12\x14\n" + + "\x05chunk\x18\x02 \x01(\fR\x05chunk\"I\n" + + "\fStatResponse\x129\n" + + "\x05entry\x18\x01 \x01(\v2#.fuhry.runtime.service.ephs.FsEntryR\x05entry\"d\n" + + "\n" + + "PutRequest\x12\x12\n" + + "\x04path\x18\x01 \x01(\tR\x04path\x12\x18\n" + + "\aversion\x18\x02 \x01(\x04R\aversion\x12\x12\n" + + "\x04size\x18\x03 \x01(\x04R\x04size\x12\x14\n" + + "\x05chunk\x18\x04 \x01(\fR\x05chunk\"\xcb\x01\n" + + "\rWatchResponse\x129\n" + + "\x05entry\x18\x01 \x01(\v2#.fuhry.runtime.service.ephs.FsEntryR\x05entry\x12E\n" + + "\x05event\x18\x02 \x01(\x0e2/.fuhry.runtime.service.ephs.WatchResponse.EventR\x05event\"8\n" + + "\x05Event\x12\v\n" + + "\aUNKNOWN\x10\x00\x12\n" + + "\n" + + "\x06CREATE\x10\x01\x12\n" + + "\n" + + "\x06MODIFY\x10\x02\x12\n" + + "\n" + + "\x06DELETE\x10\x032\x97\x04\n" + + "\x04Ephs\x12Z\n" + + "\x03Get\x12&.fuhry.runtime.service.ephs.GetRequest\x1a'.fuhry.runtime.service.ephs.GetResponse\"\x000\x01\x12[\n" + + "\x03Put\x12&.fuhry.runtime.service.ephs.PutRequest\x1a(.fuhry.runtime.service.ephs.StatResponse\"\x00(\x01\x12Z\n" + + "\x04Stat\x12&.fuhry.runtime.service.ephs.GetRequest\x1a(.fuhry.runtime.service.ephs.StatResponse\"\x00\x12^\n" + + "\x05Watch\x12&.fuhry.runtime.service.ephs.GetRequest\x1a).fuhry.runtime.service.ephs.WatchResponse\"\x000\x01\x12K\n" + + "\x05MkDir\x12(.fuhry.runtime.service.ephs.MkDirRequest\x1a\x16.google.protobuf.Empty\"\x00\x12M\n" + + "\x06Delete\x12).fuhry.runtime.service.ephs.DeleteRequest\x1a\x16.google.protobuf.Empty\"\x00B)Z'go.fuhry.dev/runtime/proto/service/ephsb\x06proto3" + +var ( + file_service_proto_rawDescOnce sync.Once + file_service_proto_rawDescData []byte +) + +func file_service_proto_rawDescGZIP() []byte { + file_service_proto_rawDescOnce.Do(func() { + file_service_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_service_proto_rawDesc), len(file_service_proto_rawDesc))) + }) + return file_service_proto_rawDescData +} + +var file_service_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_service_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_service_proto_goTypes = []any{ + (WatchResponse_Event)(0), // 0: fuhry.runtime.service.ephs.WatchResponse.Event + (*GetRequest)(nil), // 1: fuhry.runtime.service.ephs.GetRequest + (*DeleteRequest)(nil), // 2: fuhry.runtime.service.ephs.DeleteRequest + (*MkDirRequest)(nil), // 3: fuhry.runtime.service.ephs.MkDirRequest + (*GetResponse)(nil), // 4: fuhry.runtime.service.ephs.GetResponse + (*StatResponse)(nil), // 5: fuhry.runtime.service.ephs.StatResponse + (*PutRequest)(nil), // 6: fuhry.runtime.service.ephs.PutRequest + (*WatchResponse)(nil), // 7: fuhry.runtime.service.ephs.WatchResponse + (*FsEntry)(nil), // 8: fuhry.runtime.service.ephs.FsEntry + (*emptypb.Empty)(nil), // 9: google.protobuf.Empty +} +var file_service_proto_depIdxs = []int32{ + 8, // 0: fuhry.runtime.service.ephs.GetResponse.entry:type_name -> fuhry.runtime.service.ephs.FsEntry + 8, // 1: fuhry.runtime.service.ephs.StatResponse.entry:type_name -> fuhry.runtime.service.ephs.FsEntry + 8, // 2: fuhry.runtime.service.ephs.WatchResponse.entry:type_name -> fuhry.runtime.service.ephs.FsEntry + 0, // 3: fuhry.runtime.service.ephs.WatchResponse.event:type_name -> fuhry.runtime.service.ephs.WatchResponse.Event + 1, // 4: fuhry.runtime.service.ephs.Ephs.Get:input_type -> fuhry.runtime.service.ephs.GetRequest + 6, // 5: fuhry.runtime.service.ephs.Ephs.Put:input_type -> fuhry.runtime.service.ephs.PutRequest + 1, // 6: fuhry.runtime.service.ephs.Ephs.Stat:input_type -> fuhry.runtime.service.ephs.GetRequest + 1, // 7: fuhry.runtime.service.ephs.Ephs.Watch:input_type -> fuhry.runtime.service.ephs.GetRequest + 3, // 8: fuhry.runtime.service.ephs.Ephs.MkDir:input_type -> fuhry.runtime.service.ephs.MkDirRequest + 2, // 9: fuhry.runtime.service.ephs.Ephs.Delete:input_type -> fuhry.runtime.service.ephs.DeleteRequest + 4, // 10: fuhry.runtime.service.ephs.Ephs.Get:output_type -> fuhry.runtime.service.ephs.GetResponse + 5, // 11: fuhry.runtime.service.ephs.Ephs.Put:output_type -> fuhry.runtime.service.ephs.StatResponse + 5, // 12: fuhry.runtime.service.ephs.Ephs.Stat:output_type -> fuhry.runtime.service.ephs.StatResponse + 7, // 13: fuhry.runtime.service.ephs.Ephs.Watch:output_type -> fuhry.runtime.service.ephs.WatchResponse + 9, // 14: fuhry.runtime.service.ephs.Ephs.MkDir:output_type -> google.protobuf.Empty + 9, // 15: fuhry.runtime.service.ephs.Ephs.Delete:output_type -> google.protobuf.Empty + 10, // [10:16] is the sub-list for method output_type + 4, // [4:10] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_service_proto_init() } +func file_service_proto_init() { + if File_service_proto != nil { + return + } + file_types_proto_init() + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_service_proto_rawDesc), len(file_service_proto_rawDesc)), + NumEnums: 1, + NumMessages: 7, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_service_proto_goTypes, + DependencyIndexes: file_service_proto_depIdxs, + EnumInfos: file_service_proto_enumTypes, + MessageInfos: file_service_proto_msgTypes, + }.Build() + File_service_proto = out.File + file_service_proto_goTypes = nil + file_service_proto_depIdxs = nil +} diff --git a/proto/service/ephs/service.proto b/proto/service/ephs/service.proto new file mode 100644 index 0000000..4f50b6a --- /dev/null +++ b/proto/service/ephs/service.proto @@ -0,0 +1,57 @@ +syntax = "proto3"; +import "google/protobuf/empty.proto"; + +option go_package = "go.fuhry.dev/runtime/proto/service/ephs"; + +package fuhry.runtime.service.ephs; + +message GetRequest { + string path = 1; +} + +message DeleteRequest { + string path = 1; + bool recursive = 2; +} + +message MkDirRequest { + string path = 1; + bool recursive = 2; +} + +message GetResponse { + FsEntry entry = 1; + bytes chunk = 2; +} + +message StatResponse { + FsEntry entry = 1; +} + +message PutRequest { + string path = 1; + uint64 version = 2; + uint64 size = 3; + bytes chunk = 4; +} + +message WatchResponse { + enum Event { + UNKNOWN = 0; + CREATE = 1; + MODIFY = 2; + DELETE = 3; + } + + FsEntry entry = 1; + Event event = 2; +} + +service Ephs { + rpc Get (GetRequest) returns (stream GetResponse) {} + rpc Put (stream PutRequest) returns (StatResponse) {} + rpc Stat (GetRequest) returns (StatResponse) {} + rpc Watch (GetRequest) returns (stream WatchResponse) {} + rpc MkDir (MkDirRequest) returns (google.protobuf.Empty) {} + rpc Delete (DeleteRequest) returns (google.protobuf.Empty) {} +} diff --git a/proto/service/ephs/service_grpc.pb.go b/proto/service/ephs/service_grpc.pb.go new file mode 100644 index 0000000..f02bb0d --- /dev/null +++ b/proto/service/ephs/service_grpc.pb.go @@ -0,0 +1,384 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v6.32.1 +// source: service.proto + +package ephs + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + Ephs_Get_FullMethodName = "/fuhry.runtime.service.ephs.Ephs/Get" + Ephs_Put_FullMethodName = "/fuhry.runtime.service.ephs.Ephs/Put" + Ephs_Stat_FullMethodName = "/fuhry.runtime.service.ephs.Ephs/Stat" + Ephs_Watch_FullMethodName = "/fuhry.runtime.service.ephs.Ephs/Watch" + Ephs_MkDir_FullMethodName = "/fuhry.runtime.service.ephs.Ephs/MkDir" + Ephs_Delete_FullMethodName = "/fuhry.runtime.service.ephs.Ephs/Delete" +) + +// EphsClient is the client API for Ephs service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type EphsClient interface { + Get(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (Ephs_GetClient, error) + Put(ctx context.Context, opts ...grpc.CallOption) (Ephs_PutClient, error) + Stat(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*StatResponse, error) + Watch(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (Ephs_WatchClient, error) + MkDir(ctx context.Context, in *MkDirRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) +} + +type ephsClient struct { + cc grpc.ClientConnInterface +} + +func NewEphsClient(cc grpc.ClientConnInterface) EphsClient { + return &ephsClient{cc} +} + +func (c *ephsClient) Get(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (Ephs_GetClient, error) { + stream, err := c.cc.NewStream(ctx, &Ephs_ServiceDesc.Streams[0], Ephs_Get_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &ephsGetClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Ephs_GetClient interface { + Recv() (*GetResponse, error) + grpc.ClientStream +} + +type ephsGetClient struct { + grpc.ClientStream +} + +func (x *ephsGetClient) Recv() (*GetResponse, error) { + m := new(GetResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *ephsClient) Put(ctx context.Context, opts ...grpc.CallOption) (Ephs_PutClient, error) { + stream, err := c.cc.NewStream(ctx, &Ephs_ServiceDesc.Streams[1], Ephs_Put_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &ephsPutClient{stream} + return x, nil +} + +type Ephs_PutClient interface { + Send(*PutRequest) error + CloseAndRecv() (*StatResponse, error) + grpc.ClientStream +} + +type ephsPutClient struct { + grpc.ClientStream +} + +func (x *ephsPutClient) Send(m *PutRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *ephsPutClient) CloseAndRecv() (*StatResponse, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(StatResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *ephsClient) Stat(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*StatResponse, error) { + out := new(StatResponse) + err := c.cc.Invoke(ctx, Ephs_Stat_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *ephsClient) Watch(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (Ephs_WatchClient, error) { + stream, err := c.cc.NewStream(ctx, &Ephs_ServiceDesc.Streams[2], Ephs_Watch_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &ephsWatchClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Ephs_WatchClient interface { + Recv() (*WatchResponse, error) + grpc.ClientStream +} + +type ephsWatchClient struct { + grpc.ClientStream +} + +func (x *ephsWatchClient) Recv() (*WatchResponse, error) { + m := new(WatchResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *ephsClient) MkDir(ctx context.Context, in *MkDirRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, Ephs_MkDir_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *ephsClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, Ephs_Delete_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// EphsServer is the server API for Ephs service. +// All implementations must embed UnimplementedEphsServer +// for forward compatibility +type EphsServer interface { + Get(*GetRequest, Ephs_GetServer) error + Put(Ephs_PutServer) error + Stat(context.Context, *GetRequest) (*StatResponse, error) + Watch(*GetRequest, Ephs_WatchServer) error + MkDir(context.Context, *MkDirRequest) (*emptypb.Empty, error) + Delete(context.Context, *DeleteRequest) (*emptypb.Empty, error) + mustEmbedUnimplementedEphsServer() +} + +// UnimplementedEphsServer must be embedded to have forward compatible implementations. +type UnimplementedEphsServer struct { +} + +func (UnimplementedEphsServer) Get(*GetRequest, Ephs_GetServer) error { + return status.Errorf(codes.Unimplemented, "method Get not implemented") +} +func (UnimplementedEphsServer) Put(Ephs_PutServer) error { + return status.Errorf(codes.Unimplemented, "method Put not implemented") +} +func (UnimplementedEphsServer) Stat(context.Context, *GetRequest) (*StatResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Stat not implemented") +} +func (UnimplementedEphsServer) Watch(*GetRequest, Ephs_WatchServer) error { + return status.Errorf(codes.Unimplemented, "method Watch not implemented") +} +func (UnimplementedEphsServer) MkDir(context.Context, *MkDirRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method MkDir not implemented") +} +func (UnimplementedEphsServer) Delete(context.Context, *DeleteRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented") +} +func (UnimplementedEphsServer) mustEmbedUnimplementedEphsServer() {} + +// UnsafeEphsServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to EphsServer will +// result in compilation errors. +type UnsafeEphsServer interface { + mustEmbedUnimplementedEphsServer() +} + +func RegisterEphsServer(s grpc.ServiceRegistrar, srv EphsServer) { + s.RegisterService(&Ephs_ServiceDesc, srv) +} + +func _Ephs_Get_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(GetRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(EphsServer).Get(m, &ephsGetServer{stream}) +} + +type Ephs_GetServer interface { + Send(*GetResponse) error + grpc.ServerStream +} + +type ephsGetServer struct { + grpc.ServerStream +} + +func (x *ephsGetServer) Send(m *GetResponse) error { + return x.ServerStream.SendMsg(m) +} + +func _Ephs_Put_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(EphsServer).Put(&ephsPutServer{stream}) +} + +type Ephs_PutServer interface { + SendAndClose(*StatResponse) error + Recv() (*PutRequest, error) + grpc.ServerStream +} + +type ephsPutServer struct { + grpc.ServerStream +} + +func (x *ephsPutServer) SendAndClose(m *StatResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *ephsPutServer) Recv() (*PutRequest, error) { + m := new(PutRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _Ephs_Stat_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EphsServer).Stat(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Ephs_Stat_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EphsServer).Stat(ctx, req.(*GetRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Ephs_Watch_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(GetRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(EphsServer).Watch(m, &ephsWatchServer{stream}) +} + +type Ephs_WatchServer interface { + Send(*WatchResponse) error + grpc.ServerStream +} + +type ephsWatchServer struct { + grpc.ServerStream +} + +func (x *ephsWatchServer) Send(m *WatchResponse) error { + return x.ServerStream.SendMsg(m) +} + +func _Ephs_MkDir_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(MkDirRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EphsServer).MkDir(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Ephs_MkDir_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EphsServer).MkDir(ctx, req.(*MkDirRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Ephs_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DeleteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(EphsServer).Delete(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Ephs_Delete_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(EphsServer).Delete(ctx, req.(*DeleteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Ephs_ServiceDesc is the grpc.ServiceDesc for Ephs service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Ephs_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "fuhry.runtime.service.ephs.Ephs", + HandlerType: (*EphsServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Stat", + Handler: _Ephs_Stat_Handler, + }, + { + MethodName: "MkDir", + Handler: _Ephs_MkDir_Handler, + }, + { + MethodName: "Delete", + Handler: _Ephs_Delete_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Get", + Handler: _Ephs_Get_Handler, + ServerStreams: true, + }, + { + StreamName: "Put", + Handler: _Ephs_Put_Handler, + ClientStreams: true, + }, + { + StreamName: "Watch", + Handler: _Ephs_Watch_Handler, + ServerStreams: true, + }, + }, + Metadata: "service.proto", +} diff --git a/proto/service/ephs/types.pb.go b/proto/service/ephs/types.pb.go new file mode 100644 index 0000000..c7210eb --- /dev/null +++ b/proto/service/ephs/types.pb.go @@ -0,0 +1,545 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.10 +// protoc v6.32.1 +// source: types.proto + +package ephs + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type FsEntry_File_Compression int32 + +const ( + FsEntry_File_UNCOMPRESSED FsEntry_File_Compression = 0 + FsEntry_File_GZIP FsEntry_File_Compression = 1 +) + +// Enum value maps for FsEntry_File_Compression. +var ( + FsEntry_File_Compression_name = map[int32]string{ + 0: "UNCOMPRESSED", + 1: "GZIP", + } + FsEntry_File_Compression_value = map[string]int32{ + "UNCOMPRESSED": 0, + "GZIP": 1, + } +) + +func (x FsEntry_File_Compression) Enum() *FsEntry_File_Compression { + p := new(FsEntry_File_Compression) + *p = x + return p +} + +func (x FsEntry_File_Compression) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (FsEntry_File_Compression) Descriptor() protoreflect.EnumDescriptor { + return file_types_proto_enumTypes[0].Descriptor() +} + +func (FsEntry_File_Compression) Type() protoreflect.EnumType { + return &file_types_proto_enumTypes[0] +} + +func (x FsEntry_File_Compression) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use FsEntry_File_Compression.Descriptor instead. +func (FsEntry_File_Compression) EnumDescriptor() ([]byte, []int) { + return file_types_proto_rawDescGZIP(), []int{0, 1, 0} +} + +type FsEntry struct { + state protoimpl.MessageState `protogen:"open.v1"` + Created *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=created,proto3" json:"created,omitempty"` + Modified *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=modified,proto3" json:"modified,omitempty"` + Version uint64 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"` + Size uint64 `protobuf:"varint,4,opt,name=size,proto3" json:"size,omitempty"` + Owner string `protobuf:"bytes,5,opt,name=owner,proto3" json:"owner,omitempty"` + Content *FsEntry_Content `protobuf:"bytes,6,opt,name=content,proto3" json:"content,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *FsEntry) Reset() { + *x = FsEntry{} + mi := &file_types_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *FsEntry) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FsEntry) ProtoMessage() {} + +func (x *FsEntry) ProtoReflect() protoreflect.Message { + mi := &file_types_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FsEntry.ProtoReflect.Descriptor instead. +func (*FsEntry) Descriptor() ([]byte, []int) { + return file_types_proto_rawDescGZIP(), []int{0} +} + +func (x *FsEntry) GetCreated() *timestamppb.Timestamp { + if x != nil { + return x.Created + } + return nil +} + +func (x *FsEntry) GetModified() *timestamppb.Timestamp { + if x != nil { + return x.Modified + } + return nil +} + +func (x *FsEntry) GetVersion() uint64 { + if x != nil { + return x.Version + } + return 0 +} + +func (x *FsEntry) GetSize() uint64 { + if x != nil { + return x.Size + } + return 0 +} + +func (x *FsEntry) GetOwner() string { + if x != nil { + return x.Owner + } + return "" +} + +func (x *FsEntry) GetContent() *FsEntry_Content { + if x != nil { + return x.Content + } + return nil +} + +type FsEntry_Directory struct { + state protoimpl.MessageState `protogen:"open.v1"` + Entries []*FsEntry_Directory_DirectoryEntry `protobuf:"bytes,1,rep,name=entries,proto3" json:"entries,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *FsEntry_Directory) Reset() { + *x = FsEntry_Directory{} + mi := &file_types_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *FsEntry_Directory) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FsEntry_Directory) ProtoMessage() {} + +func (x *FsEntry_Directory) ProtoReflect() protoreflect.Message { + mi := &file_types_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FsEntry_Directory.ProtoReflect.Descriptor instead. +func (*FsEntry_Directory) Descriptor() ([]byte, []int) { + return file_types_proto_rawDescGZIP(), []int{0, 0} +} + +func (x *FsEntry_Directory) GetEntries() []*FsEntry_Directory_DirectoryEntry { + if x != nil { + return x.Entries + } + return nil +} + +type FsEntry_File struct { + state protoimpl.MessageState `protogen:"open.v1"` + Content []byte `protobuf:"bytes,1,opt,name=content,proto3" json:"content,omitempty"` + Compression FsEntry_File_Compression `protobuf:"varint,2,opt,name=compression,proto3,enum=fuhry.runtime.service.ephs.FsEntry_File_Compression" json:"compression,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *FsEntry_File) Reset() { + *x = FsEntry_File{} + mi := &file_types_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *FsEntry_File) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FsEntry_File) ProtoMessage() {} + +func (x *FsEntry_File) ProtoReflect() protoreflect.Message { + mi := &file_types_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FsEntry_File.ProtoReflect.Descriptor instead. +func (*FsEntry_File) Descriptor() ([]byte, []int) { + return file_types_proto_rawDescGZIP(), []int{0, 1} +} + +func (x *FsEntry_File) GetContent() []byte { + if x != nil { + return x.Content + } + return nil +} + +func (x *FsEntry_File) GetCompression() FsEntry_File_Compression { + if x != nil { + return x.Compression + } + return FsEntry_File_UNCOMPRESSED +} + +type FsEntry_LargeFile struct { + state protoimpl.MessageState `protogen:"open.v1"` + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *FsEntry_LargeFile) Reset() { + *x = FsEntry_LargeFile{} + mi := &file_types_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *FsEntry_LargeFile) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FsEntry_LargeFile) ProtoMessage() {} + +func (x *FsEntry_LargeFile) ProtoReflect() protoreflect.Message { + mi := &file_types_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FsEntry_LargeFile.ProtoReflect.Descriptor instead. +func (*FsEntry_LargeFile) Descriptor() ([]byte, []int) { + return file_types_proto_rawDescGZIP(), []int{0, 2} +} + +func (x *FsEntry_LargeFile) GetKey() string { + if x != nil { + return x.Key + } + return "" +} + +type FsEntry_Content struct { + state protoimpl.MessageState `protogen:"open.v1"` + // Types that are valid to be assigned to Content: + // + // *FsEntry_Content_Directory + // *FsEntry_Content_File + // *FsEntry_Content_LargeFile + Content isFsEntry_Content_Content `protobuf_oneof:"content"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *FsEntry_Content) Reset() { + *x = FsEntry_Content{} + mi := &file_types_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *FsEntry_Content) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FsEntry_Content) ProtoMessage() {} + +func (x *FsEntry_Content) ProtoReflect() protoreflect.Message { + mi := &file_types_proto_msgTypes[4] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FsEntry_Content.ProtoReflect.Descriptor instead. +func (*FsEntry_Content) Descriptor() ([]byte, []int) { + return file_types_proto_rawDescGZIP(), []int{0, 3} +} + +func (x *FsEntry_Content) GetContent() isFsEntry_Content_Content { + if x != nil { + return x.Content + } + return nil +} + +func (x *FsEntry_Content) GetDirectory() *FsEntry_Directory { + if x != nil { + if x, ok := x.Content.(*FsEntry_Content_Directory); ok { + return x.Directory + } + } + return nil +} + +func (x *FsEntry_Content) GetFile() *FsEntry_File { + if x != nil { + if x, ok := x.Content.(*FsEntry_Content_File); ok { + return x.File + } + } + return nil +} + +func (x *FsEntry_Content) GetLargeFile() *FsEntry_LargeFile { + if x != nil { + if x, ok := x.Content.(*FsEntry_Content_LargeFile); ok { + return x.LargeFile + } + } + return nil +} + +type isFsEntry_Content_Content interface { + isFsEntry_Content_Content() +} + +type FsEntry_Content_Directory struct { + Directory *FsEntry_Directory `protobuf:"bytes,1,opt,name=directory,proto3,oneof"` +} + +type FsEntry_Content_File struct { + File *FsEntry_File `protobuf:"bytes,2,opt,name=file,proto3,oneof"` +} + +type FsEntry_Content_LargeFile struct { + LargeFile *FsEntry_LargeFile `protobuf:"bytes,3,opt,name=large_file,json=largeFile,proto3,oneof"` +} + +func (*FsEntry_Content_Directory) isFsEntry_Content_Content() {} + +func (*FsEntry_Content_File) isFsEntry_Content_Content() {} + +func (*FsEntry_Content_LargeFile) isFsEntry_Content_Content() {} + +type FsEntry_Directory_DirectoryEntry struct { + state protoimpl.MessageState `protogen:"open.v1"` + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Directory bool `protobuf:"varint,2,opt,name=directory,proto3" json:"directory,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *FsEntry_Directory_DirectoryEntry) Reset() { + *x = FsEntry_Directory_DirectoryEntry{} + mi := &file_types_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *FsEntry_Directory_DirectoryEntry) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FsEntry_Directory_DirectoryEntry) ProtoMessage() {} + +func (x *FsEntry_Directory_DirectoryEntry) ProtoReflect() protoreflect.Message { + mi := &file_types_proto_msgTypes[5] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FsEntry_Directory_DirectoryEntry.ProtoReflect.Descriptor instead. +func (*FsEntry_Directory_DirectoryEntry) Descriptor() ([]byte, []int) { + return file_types_proto_rawDescGZIP(), []int{0, 0, 0} +} + +func (x *FsEntry_Directory_DirectoryEntry) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *FsEntry_Directory_DirectoryEntry) GetDirectory() bool { + if x != nil { + return x.Directory + } + return false +} + +var File_types_proto protoreflect.FileDescriptor + +const file_types_proto_rawDesc = "" + + "\n" + + "\vtypes.proto\x12\x1afuhry.runtime.service.ephs\x1a\x1fgoogle/protobuf/timestamp.proto\"\xe7\x06\n" + + "\aFsEntry\x124\n" + + "\acreated\x18\x01 \x01(\v2\x1a.google.protobuf.TimestampR\acreated\x126\n" + + "\bmodified\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\bmodified\x12\x18\n" + + "\aversion\x18\x03 \x01(\x04R\aversion\x12\x12\n" + + "\x04size\x18\x04 \x01(\x04R\x04size\x12\x14\n" + + "\x05owner\x18\x05 \x01(\tR\x05owner\x12E\n" + + "\acontent\x18\x06 \x01(\v2+.fuhry.runtime.service.ephs.FsEntry.ContentR\acontent\x1a\xa7\x01\n" + + "\tDirectory\x12V\n" + + "\aentries\x18\x01 \x03(\v2<.fuhry.runtime.service.ephs.FsEntry.Directory.DirectoryEntryR\aentries\x1aB\n" + + "\x0eDirectoryEntry\x12\x12\n" + + "\x04name\x18\x01 \x01(\tR\x04name\x12\x1c\n" + + "\tdirectory\x18\x02 \x01(\bR\tdirectory\x1a\xa3\x01\n" + + "\x04File\x12\x18\n" + + "\acontent\x18\x01 \x01(\fR\acontent\x12V\n" + + "\vcompression\x18\x02 \x01(\x0e24.fuhry.runtime.service.ephs.FsEntry.File.CompressionR\vcompression\")\n" + + "\vCompression\x12\x10\n" + + "\fUNCOMPRESSED\x10\x00\x12\b\n" + + "\x04GZIP\x10\x01\x1a\x1d\n" + + "\tLargeFile\x12\x10\n" + + "\x03key\x18\x01 \x01(\tR\x03key\x1a\xf3\x01\n" + + "\aContent\x12M\n" + + "\tdirectory\x18\x01 \x01(\v2-.fuhry.runtime.service.ephs.FsEntry.DirectoryH\x00R\tdirectory\x12>\n" + + "\x04file\x18\x02 \x01(\v2(.fuhry.runtime.service.ephs.FsEntry.FileH\x00R\x04file\x12N\n" + + "\n" + + "large_file\x18\x03 \x01(\v2-.fuhry.runtime.service.ephs.FsEntry.LargeFileH\x00R\tlargeFileB\t\n" + + "\acontentB)Z'go.fuhry.dev/runtime/proto/service/ephsb\x06proto3" + +var ( + file_types_proto_rawDescOnce sync.Once + file_types_proto_rawDescData []byte +) + +func file_types_proto_rawDescGZIP() []byte { + file_types_proto_rawDescOnce.Do(func() { + file_types_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_types_proto_rawDesc), len(file_types_proto_rawDesc))) + }) + return file_types_proto_rawDescData +} + +var file_types_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_types_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_types_proto_goTypes = []any{ + (FsEntry_File_Compression)(0), // 0: fuhry.runtime.service.ephs.FsEntry.File.Compression + (*FsEntry)(nil), // 1: fuhry.runtime.service.ephs.FsEntry + (*FsEntry_Directory)(nil), // 2: fuhry.runtime.service.ephs.FsEntry.Directory + (*FsEntry_File)(nil), // 3: fuhry.runtime.service.ephs.FsEntry.File + (*FsEntry_LargeFile)(nil), // 4: fuhry.runtime.service.ephs.FsEntry.LargeFile + (*FsEntry_Content)(nil), // 5: fuhry.runtime.service.ephs.FsEntry.Content + (*FsEntry_Directory_DirectoryEntry)(nil), // 6: fuhry.runtime.service.ephs.FsEntry.Directory.DirectoryEntry + (*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp +} +var file_types_proto_depIdxs = []int32{ + 7, // 0: fuhry.runtime.service.ephs.FsEntry.created:type_name -> google.protobuf.Timestamp + 7, // 1: fuhry.runtime.service.ephs.FsEntry.modified:type_name -> google.protobuf.Timestamp + 5, // 2: fuhry.runtime.service.ephs.FsEntry.content:type_name -> fuhry.runtime.service.ephs.FsEntry.Content + 6, // 3: fuhry.runtime.service.ephs.FsEntry.Directory.entries:type_name -> fuhry.runtime.service.ephs.FsEntry.Directory.DirectoryEntry + 0, // 4: fuhry.runtime.service.ephs.FsEntry.File.compression:type_name -> fuhry.runtime.service.ephs.FsEntry.File.Compression + 2, // 5: fuhry.runtime.service.ephs.FsEntry.Content.directory:type_name -> fuhry.runtime.service.ephs.FsEntry.Directory + 3, // 6: fuhry.runtime.service.ephs.FsEntry.Content.file:type_name -> fuhry.runtime.service.ephs.FsEntry.File + 4, // 7: fuhry.runtime.service.ephs.FsEntry.Content.large_file:type_name -> fuhry.runtime.service.ephs.FsEntry.LargeFile + 8, // [8:8] is the sub-list for method output_type + 8, // [8:8] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name +} + +func init() { file_types_proto_init() } +func file_types_proto_init() { + if File_types_proto != nil { + return + } + file_types_proto_msgTypes[4].OneofWrappers = []any{ + (*FsEntry_Content_Directory)(nil), + (*FsEntry_Content_File)(nil), + (*FsEntry_Content_LargeFile)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_types_proto_rawDesc), len(file_types_proto_rawDesc)), + NumEnums: 1, + NumMessages: 6, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_types_proto_goTypes, + DependencyIndexes: file_types_proto_depIdxs, + EnumInfos: file_types_proto_enumTypes, + MessageInfos: file_types_proto_msgTypes, + }.Build() + File_types_proto = out.File + file_types_proto_goTypes = nil + file_types_proto_depIdxs = nil +} diff --git a/proto/service/ephs/types.proto b/proto/service/ephs/types.proto new file mode 100644 index 0000000..b5d011b --- /dev/null +++ b/proto/service/ephs/types.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +import "google/protobuf/timestamp.proto"; + +option go_package = "go.fuhry.dev/runtime/proto/service/ephs"; + +package fuhry.runtime.service.ephs; + +message FsEntry { + message Directory { + message DirectoryEntry { + string name = 1; + bool directory = 2; + } + repeated DirectoryEntry entries = 1; + } + + message File { + enum Compression { + UNCOMPRESSED = 0; + GZIP = 1; + } + bytes content = 1; + Compression compression = 2; + } + + message LargeFile { + string key = 1; + } + + message Content { + oneof content { + Directory directory = 1; + File file = 2; + LargeFile large_file = 3; + } + } + + google.protobuf.Timestamp created = 1; + google.protobuf.Timestamp modified = 2; + uint64 version = 3; + uint64 size = 4; + string owner = 5; + Content content = 6; +} diff --git a/utils/stringmatch/serialization2.go b/utils/stringmatch/serialization2.go new file mode 100644 index 0000000..a2bb20b --- /dev/null +++ b/utils/stringmatch/serialization2.go @@ -0,0 +1,114 @@ +package stringmatch + +import ( + "errors" + "fmt" +) + +type NewSyntaxMatchRule struct { + Prefix *struct { + V string `yaml:"prefix"` + } `yaml:",inline"` + Suffix *struct { + V string `yaml:"suffix"` + } `yaml:",inline"` + Exact *struct { + V string `yaml:"exact"` + } `yaml:",inline"` + Contains *struct { + V string `yaml:"contains"` + } `yaml:",inline"` + Regexp *struct { + V string `yaml:"regexp"` + } `yaml:",inline"` + Any *struct { + V bool `yaml:"any"` + } `yaml:",inline"` + Never *struct { + V bool `yaml:"never"` + } `yaml:",inline"` + And *struct { + V []*NewSyntaxMatchRule `yaml:"and"` + } `yaml:",inline"` + Or *struct { + V []*NewSyntaxMatchRule `yaml:"or"` + } `yaml:",inline"` +} + +func (m *NewSyntaxMatchRule) Matcher() (StringMatcher, error) { + var out StringMatcher + count := 0 + + if m.Prefix != nil { + out = Prefix(m.Prefix.V) + count++ + } + if m.Suffix != nil { + out = Suffix(m.Suffix.V) + count++ + } + if m.Exact != nil { + out = Exact(m.Exact.V) + count++ + } + if m.Contains != nil { + out = Contains(m.Contains.V) + count++ + } + if m.Regexp != nil { + out = Regexp(m.Regexp.V) + count++ + } + if m.Any != nil { + if m.Any.V != true { + return nil, errors.New("any may only be true") + } + out = Any{} + count++ + } + if m.Never != nil && m.Never.V { + if m.Never.V != true { + return nil, errors.New("never may only be true") + } + out = Never{} + count++ + } + if m.And != nil && len(m.And.V) > 0 { + count++ + out = &andMatcher{} + for i, m := range m.And.V { + mat, err := m.Matcher() + if err != nil { + return nil, fmt.Errorf("and-rule at index %d: %v", i, err) + } + out.(*andMatcher).matchers = append(out.(*andMatcher).matchers, mat) + } + } + if m.Or != nil && len(m.Or.V) > 0 { + count++ + out = &orMatcher{} + for i, m := range m.Or.V { + mat, err := m.Matcher() + if err != nil { + return nil, fmt.Errorf("or-rule at index %d: %v", i, err) + } + out.(*orMatcher).matchers = append(out.(*orMatcher).matchers, mat) + } + } + + if count != 1 { + return nil, fmt.Errorf( + "match rule must contain exactly one of: prefix, suffix, exact, " + + "contains, regexp, any, never, and, or") + } + + return out, nil +} + +func (m *NewSyntaxMatchRule) Match(v string) bool { + mat, err := m.Matcher() + if err != nil { + panic(err) + } + return mat.Match(v) +} diff --git a/utils/stringmatch/serialization2_test.go b/utils/stringmatch/serialization2_test.go new file mode 100644 index 0000000..3b5c140 --- /dev/null +++ b/utils/stringmatch/serialization2_test.go @@ -0,0 +1,88 @@ +package stringmatch + +import ( + "bytes" + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "gopkg.in/yaml.v3" +) + +type ptr[T any] interface { + *T +} + +func mustDecode[T any, PT ptr[T]](t *testing.T, str string) PT { + out := new(T) + buf := bytes.NewBufferString(str) + decoder := yaml.NewDecoder(buf) + if err := decoder.Decode(out); err != nil { + fmt.Fprintf(os.Stderr, "error decoding yaml %q: %v", str, err) + t.FailNow() + } + + return out +} + +func TestNSMExact(t *testing.T) { + m := mustDecode[NewSyntaxMatchRule](t, "exact: meh") + assert.True(t, m.Match("meh")) +} + +func TestNSMPrefix(t *testing.T) { + m := mustDecode[NewSyntaxMatchRule](t, "prefix: meh") + assert.True(t, m.Match("mehh")) + assert.False(t, m.Match("ehh")) +} + +func TestNSMSuffix(t *testing.T) { + m := mustDecode[NewSyntaxMatchRule](t, "suffix: eh") + assert.True(t, m.Match("meh")) + assert.False(t, m.Match("ehh")) +} + +func TestNSMContains(t *testing.T) { + m := mustDecode[NewSyntaxMatchRule](t, "contains: eh") + assert.True(t, m.Match("meh")) + assert.False(t, m.Match("uh")) +} + +func TestNSMRegexp(t *testing.T) { + m := mustDecode[NewSyntaxMatchRule](t, "regexp: ^meh$") + assert.True(t, m.Match("meh")) + assert.False(t, m.Match("ehh")) +} + +func TestNSMAnd(t *testing.T) { + const rules = `and: + - prefix: f + - suffix: oo +` + m := mustDecode[NewSyntaxMatchRule](t, rules) + assert.True(t, m.Match("foo")) + assert.False(t, m.Match("boo")) +} + +func TestNSMOr(t *testing.T) { + const rules = `or: + - prefix: f + - suffix: oo +` + m := mustDecode[NewSyntaxMatchRule](t, rules) + assert.True(t, m.Match("foo")) + assert.True(t, m.Match("boo")) + assert.False(t, m.Match("bar")) +} + +func TestNSMErrors(t *testing.T) { + _, err := mustDecode[NewSyntaxMatchRule](t, "startswith: invalid").Matcher() + assert.Error(t, err) + + _, err = mustDecode[NewSyntaxMatchRule](t, "contains: a\nprefix: b").Matcher() + assert.Error(t, err) + + _, err = mustDecode[NewSyntaxMatchRule](t, "any: false\nprefix: b").Matcher() + assert.Error(t, err) +} -- 2.50.1