--- /dev/null
+package config_watcher
+
+import (
+ "context"
+ "io"
+
+ "go.fuhry.dev/runtime/ephs"
+)
+
+type ephsConfigWatcher struct {
+ f string
+ client ephs.Client
+ ctx context.Context
+ cancel context.CancelFunc
+ ch <-chan *ephs.WatchResponse
+}
+
+var _ ConfigWatcher = &ephsConfigWatcher{}
+
+func (w *ephsConfigWatcher) Next() (*Update, error) {
+ select {
+ case <-w.ch:
+ reader, err := w.client.Get(w.f)
+ if err != nil {
+ return nil, err
+ }
+
+ contents, err := io.ReadAll(reader)
+ if err != nil {
+ return nil, err
+ }
+
+ return &Update{
+ Contents: contents,
+ }, nil
+ case <-w.ctx.Done():
+ return nil, w.ctx.Err()
+ }
+}
+
+func (w *ephsConfigWatcher) Close() error {
+ w.cancel()
+ return nil
+}
+
+func newEphsConfigWatcher(ctx context.Context, filePath string) (*ephsConfigWatcher, error) {
+ client, err := ephs.DefaultClient()
+ if err != nil {
+ return nil, err
+ }
+
+ ctx, cancel := context.WithCancel(ctx)
+
+ watchCh, err := client.Watch(ctx, filePath)
+
+ w := &ephsConfigWatcher{
+ f: filePath,
+ client: client,
+ ctx: ctx,
+ cancel: cancel,
+ ch: watchCh,
+ }
+
+ return w, nil
+}
--- /dev/null
+package config_watcher
+
+import (
+ "fmt"
+ "io"
+ "os"
+ "sync"
+
+ "go.fuhry.dev/runtime/mtls/fsnotify"
+ "go.fuhry.dev/runtime/utils/log"
+)
+
+type localConfigWatcher struct {
+ f string
+ written bool
+ mu sync.Mutex
+ ch chan bool
+ log log.Logger
+}
+
+func (w *localConfigWatcher) Next() (*Update, error) {
+ if v := <-w.ch; !v {
+ return nil, io.EOF
+ }
+
+ contents, err := os.ReadFile(w.f)
+ if err != nil {
+ return nil, err
+ }
+ return &Update{
+ Contents: contents,
+ }, nil
+}
+
+func (w *localConfigWatcher) Close() error {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ if err := fsnotify.Unsubscribe(w.f); err != nil {
+ return err
+ }
+
+ close(w.ch)
+ w.ch = nil
+ return nil
+}
+
+func (w *localConfigWatcher) notif(filePath string, op fsnotify.Op) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+
+ w.log.V(1).Debugf("event(path=%s, op=%v)", filePath, op)
+
+ if w.ch == nil {
+ return
+ }
+
+ if filePath != w.f {
+ return
+ }
+
+ if op.Has(fsnotify.Write) {
+ w.written = true
+ }
+ if w.written && op.Has(fsnotify.Close) {
+ w.written = false
+ w.ch <- true
+ }
+}
+
+var _ ConfigWatcher = &localConfigWatcher{}
+
+func newLocalConfigWatcher(filePath string) (*localConfigWatcher, error) {
+ filePath = fsnotify.RealPath(filePath)
+
+ w := &localConfigWatcher{
+ f: filePath,
+ ch: make(chan bool),
+ log: log.Default().WithPrefix(fmt.Sprintf("localConfigWatcher(%s)", filePath)),
+ }
+
+ err := fsnotify.NotifyPath(filePath, w.notif)
+ if err != nil {
+ close(w.ch)
+ return nil, err
+ }
+
+ // send first update immediately
+ go (func() {
+ w.ch <- true
+ })()
+
+ return w, nil
+}
--- /dev/null
+package config_watcher
+
+import (
+ "context"
+
+ "go.fuhry.dev/runtime/ephs"
+)
+
+type ConfigWatcher interface {
+ Next() (*Update, error)
+ Close() error
+}
+
+type Update struct {
+ Contents []byte
+}
+
+func Watch(ctx context.Context, filePath string) (ConfigWatcher, error) {
+ if ephs.IsEphsPath(filePath) {
+ return newEphsConfigWatcher(ctx, filePath)
+ }
+
+ return newLocalConfigWatcher(filePath)
+}
--- /dev/null
+package ephs
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "math"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/quic-go/quic-go"
+ "go.fuhry.dev/runtime/grpc"
+ "go.fuhry.dev/runtime/mtls"
+ ephs_pb "go.fuhry.dev/runtime/proto/service/ephs"
+ "go.fuhry.dev/runtime/utils/log"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+const KeyPrefix = "/ephs/"
+const ChunkSize = 262144
+
+const formatEntryDateFormat = "Monday, 2 Jan 2006 15:04:05 -0700"
+
+type WatchResponse = ephs_pb.WatchResponse
+
+type Client interface {
+ Get(path string) (io.Reader, error)
+ GetContext(ctx context.Context, path string) (io.ReadCloser, error)
+
+ Stat(path string) (*ephs_pb.FsEntry, error)
+ StatContext(ctx context.Context, path string) (*ephs_pb.FsEntry, error)
+
+ Put(path string, size uint64, content io.Reader) (*ephs_pb.FsEntry, error)
+ PutContext(ctx context.Context, path string, size uint64, content io.Reader) (*ephs_pb.FsEntry, error)
+
+ Delete(path string, recursive bool) error
+ DeleteContext(ctx context.Context, path string, recursive bool) error
+
+ MkDir(path string, recursive bool) error
+ MkDirContext(ctx context.Context, path string, recursive bool) error
+
+ Watch(ctx context.Context, path string) (<-chan *ephs_pb.WatchResponse, error)
+}
+
+type ClientOption interface {
+ Apply(*clientImpl) error
+}
+
+type genericClientOption struct {
+ apply func(*clientImpl) error
+}
+
+type clientImpl struct {
+ defaultCtx context.Context
+ defaultTimeout time.Duration
+ id mtls.Identity
+
+ client grpc.Client
+ conn *grpc.ClientConn
+}
+
+type getReader struct {
+ stream ephs_pb.Ephs_GetClient
+ buf *bytes.Buffer
+ cancel context.CancelFunc
+}
+
+var DefaultClientContext context.Context
+
+var ephsQuicConfig = &quic.Config{
+ HandshakeIdleTimeout: 5 * time.Second,
+ MaxIdleTimeout: 120 * time.Minute,
+ KeepAlivePeriod: 5 * time.Second,
+}
+
+var defaultClient Client
+var defaultClientMu sync.Mutex
+
+func IsEphsPath(p string) bool {
+ return strings.HasPrefix(p, KeyPrefix)
+}
+
+func FormatFsEntry(e *ephs_pb.FsEntry) string {
+ if e == nil {
+ return "<nil>"
+ }
+
+ var out string
+
+ out += fmt.Sprintf("Owner: %s\n", e.Owner)
+ out += fmt.Sprintf("Version: %d\n", e.Version)
+ out += fmt.Sprintf("Created: %s\n", e.Created.AsTime().Format(formatEntryDateFormat))
+ out += fmt.Sprintf("Modified: %s\n", e.Modified.AsTime().Format(formatEntryDateFormat))
+
+ if e.Content.GetDirectory() != nil {
+ children := len(e.Content.GetDirectory().GetEntries())
+ out += fmt.Sprintf("Children: %d\n", children)
+ if children > 0 {
+ out += strings.Repeat("-", 70) + "\nEntries:\n"
+ for _, child := range e.Content.GetDirectory().GetEntries() {
+ dirMarker := " "
+ if child.Directory {
+ dirMarker = "[d] "
+ }
+
+ out += dirMarker + child.Name
+ if child.Directory {
+ out += "/"
+ }
+ out += "\n"
+ }
+ }
+ } else {
+ out += fmt.Sprintf("Size: %d bytes (%s)\n", e.Size, humanFilesize(e.Size))
+ }
+
+ if f := e.Content.GetFile(); f != nil {
+ out += "Storage: inline\n"
+ out += fmt.Sprintf("Compression: %s\n", f.Compression.String())
+ out += fmt.Sprintf("Compressed size: %d bytes (%s)\n", len(f.Content), humanFilesize(uint64(len(f.Content))))
+ } else if f := e.Content.GetLargeFile(); f != nil {
+ out += "Storage: S3\n"
+ out += fmt.Sprintf("Key: %s\n", f.Key)
+ }
+
+ return out
+}
+
+func (o *genericClientOption) Apply(c *clientImpl) error {
+ return o.apply(c)
+}
+
+func WithDefaultTimeout(d time.Duration) ClientOption {
+ return &genericClientOption{
+ apply: func(c *clientImpl) error {
+ c.defaultTimeout = d
+ return nil
+ },
+ }
+}
+
+func DefaultClient() (Client, error) {
+ defaultClientMu.Lock()
+ defer defaultClientMu.Unlock()
+
+ if defaultClient != nil {
+ return defaultClient, nil
+ }
+
+ if DefaultClientContext == nil {
+ DefaultClientContext = context.Background()
+ }
+
+ client, err := NewClient(DefaultClientContext, mtls.DefaultIdentity())
+ if err != nil {
+ return nil, err
+ }
+
+ defaultClient = client
+ return defaultClient, nil
+}
+
+func NewClient(ctx context.Context, localId mtls.Identity, opts ...ClientOption) (Client, error) {
+ cl := &clientImpl{
+ defaultCtx: ctx,
+ defaultTimeout: 15 * time.Second,
+ id: localId,
+ }
+
+ for _, opt := range opts {
+ if err := opt.Apply(cl); err != nil {
+ return nil, err
+ }
+ }
+
+ return cl, nil
+}
+
+func (c *clientImpl) Get(path string) (io.Reader, error) {
+ ctx, cancel := context.WithTimeout(c.defaultCtx, c.defaultTimeout)
+
+ reader, err := c.GetContext(ctx, path)
+ if err != nil {
+ return nil, err
+ }
+
+ reader.(*getReader).cancel = cancel
+
+ return reader, nil
+}
+
+func (c *clientImpl) GetContext(ctx context.Context, path string) (io.ReadCloser, error) {
+ rpc, err := c.grpcClient()
+ if err != nil {
+ return nil, err
+ }
+ req := &ephs_pb.GetRequest{Path: path}
+ stream, err := rpc.Get(ctx, req)
+ if err != nil {
+ return nil, err
+ }
+
+ return &getReader{stream, &bytes.Buffer{}, nil}, nil
+}
+
+func (r *getReader) Read(p []byte) (int, error) {
+ if r.buf.Len() < 1 {
+ msg, err := r.stream.Recv()
+ if err != nil {
+ return 0, err
+ }
+
+ if len(msg.Chunk) == 0 {
+ return 0, io.EOF
+ }
+
+ r.buf.Write(msg.Chunk)
+ }
+
+ return r.buf.Read(p)
+}
+
+func (r *getReader) Close() error {
+ if r.cancel != nil {
+ r.cancel()
+ }
+ return r.stream.CloseSend()
+}
+
+func (c *clientImpl) Stat(path string) (*ephs_pb.FsEntry, error) {
+ ctx, cancel := context.WithTimeout(c.defaultCtx, c.defaultTimeout)
+ defer cancel()
+
+ return c.StatContext(ctx, path)
+}
+
+func (c *clientImpl) StatContext(ctx context.Context, path string) (*ephs_pb.FsEntry, error) {
+ rpc, err := c.grpcClient()
+ if err != nil {
+ return nil, err
+ }
+
+ req := &ephs_pb.GetRequest{
+ Path: path,
+ }
+ resp, err := rpc.Stat(ctx, req)
+ if err != nil {
+ return nil, err
+ }
+
+ return resp.GetEntry(), nil
+}
+
+func (c *clientImpl) Delete(path string, recursive bool) error {
+ ctx, cancel := context.WithTimeout(c.defaultCtx, c.defaultTimeout)
+ defer cancel()
+
+ return c.DeleteContext(ctx, path, recursive)
+}
+
+func (c *clientImpl) DeleteContext(ctx context.Context, path string, recursive bool) error {
+ rpc, err := c.grpcClient()
+ if err != nil {
+ return err
+ }
+
+ req := &ephs_pb.DeleteRequest{
+ Path: path,
+ Recursive: recursive,
+ }
+ _, err = rpc.Delete(ctx, req)
+ return err
+}
+
+func (c *clientImpl) MkDir(path string, recursive bool) error {
+ ctx, cancel := context.WithTimeout(c.defaultCtx, c.defaultTimeout)
+ defer cancel()
+
+ return c.MkDirContext(ctx, path, recursive)
+}
+
+func (c *clientImpl) MkDirContext(ctx context.Context, path string, recursive bool) error {
+ rpc, err := c.grpcClient()
+ if err != nil {
+ return err
+ }
+
+ req := &ephs_pb.MkDirRequest{
+ Path: path,
+ Recursive: recursive,
+ }
+ _, err = rpc.MkDir(ctx, req)
+ return err
+}
+
+func (c *clientImpl) Put(path string, size uint64, r io.Reader) (*ephs_pb.FsEntry, error) {
+ ctx, cancel := context.WithTimeout(c.defaultCtx, c.defaultTimeout)
+ defer cancel()
+
+ return c.PutContext(ctx, path, size, r)
+}
+
+func (c *clientImpl) PutContext(ctx context.Context, path string, size uint64, r io.Reader) (*ephs_pb.FsEntry, error) {
+ rpc, err := c.grpcClient()
+ if err != nil {
+ return nil, err
+ }
+
+ req := &ephs_pb.PutRequest{
+ Path: path,
+ Size: size,
+ Version: 0,
+ Chunk: make([]byte, min(size, ChunkSize)),
+ }
+ stream, err := rpc.Put(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ for nw := uint64(0); nw < size; {
+ n, err := r.Read(req.Chunk)
+ if n == 0 || err == io.EOF {
+ break
+ }
+ if err != nil {
+ stream.CloseSend()
+ return nil, err
+ }
+ req.Chunk = req.Chunk[:n]
+ err = stream.Send(req)
+ if err != nil {
+ return nil, err
+ }
+ nw += uint64(n)
+ if err == io.EOF {
+ break
+ }
+ }
+
+ response, err := stream.CloseAndRecv()
+ if err != nil && err != io.EOF {
+ return nil, err
+ }
+
+ if response != nil && response.GetEntry() != nil {
+ return response.GetEntry(), nil
+ } else {
+ return nil, errors.New("upload succeeded but received nil response")
+ }
+}
+
+func (c *clientImpl) Watch(ctx context.Context, path string) (<-chan *ephs_pb.WatchResponse, error) {
+ rpc, err := c.grpcClient()
+ if err != nil {
+ return nil, err
+ }
+
+ req := &ephs_pb.GetRequest{
+ Path: path,
+ }
+ stream, err := rpc.Watch(ctx, req)
+ if err != nil {
+ return nil, err
+ }
+
+ ch := make(chan *ephs_pb.WatchResponse)
+
+ go c.watch(ctx, path, stream, ch)
+
+ return ch, nil
+}
+
+func (c *clientImpl) watch(origCtx context.Context, origPath string, stream ephs_pb.Ephs_WatchClient, ch chan *ephs_pb.WatchResponse) {
+ defer close(ch)
+
+ for {
+ msg, err := stream.Recv()
+ if err != nil {
+ if status, ok := status.FromError(err); ok && status.Code() == codes.Unavailable {
+ stream.CloseSend()
+
+ if rpc, err := c.grpcClient(); err == nil {
+ stream, err = rpc.Watch(origCtx, &ephs_pb.GetRequest{Path: origPath})
+ if err == nil {
+ log.Default().Noticef("reconnected watcher with new stream")
+ continue
+ } else {
+ log.Default().Errorf("error reestablishing watch stream: %v", err)
+ }
+ } else {
+ log.Default().Errorf("error reconnecting: %v", err)
+ }
+ }
+ log.Default().Errorf("error receiving watch stream: %v", err)
+ return
+ }
+
+ timer := time.NewTimer(c.defaultTimeout)
+ select {
+ case ch <- msg:
+ // do nothing - we've written to the channel and that's all we require
+ case <-timer.C:
+ return
+ }
+ }
+}
+
+func (c *clientImpl) grpcClient() (ephs_pb.EphsClient, error) {
+ var err error
+ serverId := mtls.NewServiceIdentity("ephs")
+
+ if c.client == nil {
+ c.client, err = grpc.NewGrpcClient(c.defaultCtx, serverId, c.id,
+ grpc.WithConnectionFactory(&grpc.QUICConnectionFactory{
+ QUICConfig: ephsQuicConfig.Clone(),
+ }))
+ if err != nil {
+ return nil, fmt.Errorf("error creating grpc client: %T: %v", err, err)
+ }
+ }
+
+ deadline, ok := c.defaultCtx.Deadline()
+ if !ok {
+ deadline = time.Now().Add(60 * time.Second)
+ }
+ var (
+ conn *grpc.ClientConn
+ lastErr error
+ )
+
+ for time.Now().Before(deadline) {
+ conn, lastErr = c.client.Conn()
+ if lastErr == nil {
+ break
+ }
+ log.Default().Warningf("error establishing grpc connection to ephs server, retrying in 1s: %T: %v", lastErr, lastErr)
+ select {
+ case <-c.defaultCtx.Done():
+ log.Default().Error(c.defaultCtx.Err())
+ return nil, c.defaultCtx.Err()
+ case <-time.NewTimer(1 * time.Second).C:
+ }
+ }
+ if lastErr != nil {
+ return nil, lastErr
+ }
+
+ confFsCl := ephs_pb.NewEphsClient(conn)
+ return confFsCl, nil
+}
+
+func humanFilesize(s uint64) string {
+ if s > uint64(0.9*math.Pow(2, 40)) {
+ return fmt.Sprintf("%.2f TiB", float64(s)/math.Pow(2, 40))
+ } else if s > uint64(0.9*math.Pow(2, 30)) {
+ return fmt.Sprintf("%.2f GiB", float64(s)/math.Pow(2, 30))
+ } else if s > uint64(0.9*math.Pow(2, 20)) {
+ return fmt.Sprintf("%.2f MiB", float64(s)/math.Pow(2, 20))
+ } else if s > uint64(0.9*math.Pow(2, 10)) {
+ return fmt.Sprintf("%.2f KiB", float64(s)/math.Pow(2, 10))
+ }
+ return fmt.Sprintf("%d bytes", s)
+}
--- /dev/null
+package servicer
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "os"
+
+ "gopkg.in/yaml.v3"
+
+ "go.fuhry.dev/runtime/utils/stringmatch"
+)
+
+type AclRule struct {
+ Principal *stringmatch.NewSyntaxMatchRule `yaml:"principal"`
+ Key *stringmatch.NewSyntaxMatchRule `yaml:"key"`
+ Invert bool `yaml:"invert"`
+}
+
+type Acl struct {
+ Rules []*AclRule `yaml:"rules"`
+}
+
+func LoadAcl(filePath string) (*Acl, error) {
+ fp, err := os.OpenFile(filePath, os.O_RDONLY, os.FileMode(0))
+ if err != nil {
+ return nil, err
+ }
+ defer fp.Close()
+
+ acl := &Acl{}
+ decoder := yaml.NewDecoder(fp)
+ if err := decoder.Decode(acl); err != nil {
+ return nil, err
+ }
+
+ if err := acl.Validate(); err != nil {
+ return nil, err
+ }
+
+ return acl, nil
+}
+
+func LoadAclString(text string) (*Acl, error) {
+ buf := bytes.NewBufferString(text)
+ acl := &Acl{}
+ decoder := yaml.NewDecoder(buf)
+ if err := decoder.Decode(acl); err != nil {
+ return nil, err
+ }
+
+ if err := acl.Validate(); err != nil {
+ return nil, err
+ }
+
+ return acl, nil
+}
+
+func (r *AclRule) Match(principal, key string) bool {
+ if !r.Principal.Match(principal) {
+ return false
+ }
+
+ keyMatcher, err := r.Key.Matcher()
+ if err != nil {
+ return false
+ }
+
+ keyMatcher = keyMatcher.Sub(map[string]string{"principal": principal})
+
+ return keyMatcher.Match(key)
+}
+
+func (a *Acl) Check(principal, key string) bool {
+ for _, r := range a.Rules {
+ if r.Match(principal, key) {
+ return !r.Invert
+ }
+ }
+
+ return false
+}
+
+func (a *Acl) Validate() error {
+ if len(a.Rules) < 1 {
+ return errors.New("access control list is empty")
+ }
+
+ var errs []error
+ for i, rule := range a.Rules {
+ if rule.Key == nil {
+ errs = append(errs, fmt.Errorf("rule %d missing key", i))
+ } else {
+ if _, err := rule.Key.Matcher(); err != nil {
+ errs = append(errs, fmt.Errorf("rule %d key: %v", i, err))
+ }
+ }
+
+ if rule.Principal == nil {
+ errs = append(errs, fmt.Errorf("rule %d missing principal", i))
+ } else {
+ if _, err := rule.Principal.Matcher(); err != nil {
+ errs = append(errs, fmt.Errorf("rule %d principal: %v", i, err))
+ }
+ }
+ }
+
+ return errors.Join(errs...)
+}
--- /dev/null
+package servicer
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+const testRules = `
+rules:
+ - principal:
+ exact: bob
+ key:
+ exact: /{{principal}}/can/write/this
+ - principal:
+ any: true
+ key:
+ exact: /writable/by/self/{{principal}}
+ - principal:
+ or:
+ - exact: bob
+ - exact: suzie
+ key:
+ prefix: /or/test/{{principal}}/
+`
+
+func TestAcl(t *testing.T) {
+ type testCase struct {
+ description, princ, key string
+ expect bool
+ }
+
+ acl, err := LoadAclString(testRules)
+ assert.NoError(t, err)
+
+ testCases := []testCase{
+ {
+ "bob has access to /bob/can/write/this",
+ "bob",
+ "/bob/can/write/this",
+ true,
+ },
+ {
+ "bob does not have access to /suzie/can/write/this",
+ "bob",
+ "/suzie/can/write/this",
+ false,
+ },
+ {
+ "suzie does not have access to /suzie/can/write/this",
+ "suzie",
+ "/suzie/can/write/this",
+ false,
+ },
+ {
+ "bob has access to /writable/by/self/bob",
+ "bob",
+ "/writable/by/self/bob",
+ true,
+ },
+ {
+ "suzie has access to /writable/by/self/suzie",
+ "suzie",
+ "/writable/by/self/suzie",
+ true,
+ },
+ {
+ "bob has access to /writable/by/self/suzie",
+ "suzie",
+ "/writable/by/self/suzie",
+ true,
+ },
+ {
+ "bob has access to /or/test/bob/foo",
+ "bob",
+ "/or/test/bob/foo",
+ true,
+ },
+ {
+ "suzie has access to /or/test/suzie/foo",
+ "suzie",
+ "/or/test/suzie/foo",
+ true,
+ },
+ {
+ "frank does not have access to /or/test/frank/foo",
+ "frank",
+ "/or/test/frank/foo",
+ false,
+ },
+ {
+ "bob does not have access /no/rule/for/this",
+ "bob",
+ "/no/rule/for/this",
+ false,
+ },
+ }
+
+ for i, tc := range testCases {
+ assert.Equal(t, tc.expect, acl.Check(tc.princ, tc.key),
+ "test case %d: %s", i, tc.description)
+ }
+}
--- /dev/null
+package servicer
+
+import (
+ "bytes"
+ "compress/gzip"
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "math/rand"
+ "regexp"
+
+ "github.com/minio/minio-go/v7"
+ "go.fuhry.dev/runtime/ephs"
+ ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
+
+ "go.fuhry.dev/runtime/utils/log"
+)
+
+const LargeFileThreshold = ephs.ChunkSize
+
+type FsEntry struct {
+ *ephs_proto.FsEntry
+}
+
+var ErrDirectory = errors.New("is a directory")
+var ErrUnknownCompression = errors.New("file compressed with unknown scheme")
+
+var RegexpLargeFileKey = regexp.MustCompile(`^[0-9A-Za-z_-]{64}$`)
+
+func (e *FsEntry) GetContents(ctx context.Context, s3 *minio.Client) (io.Reader, error) {
+ switch {
+ case e.FsEntry.Content.GetDirectory() != nil:
+ return nil, ErrDirectory
+ case e.FsEntry.Content.GetFile() != nil:
+ return e.readFile()
+ case e.FsEntry.Content.GetLargeFile() != nil:
+ return e.readLargeFile(ctx, s3)
+ default:
+ return nil, errors.New("file content not populated with any known type")
+ }
+}
+
+func (e *FsEntry) SetContents(ctx context.Context, s3 *minio.Client, r io.Reader, size uint64) error {
+ if e.FsEntry.Content.GetDirectory() != nil {
+ return ErrDirectory
+ }
+
+ var oldLargeFile ephs_proto.FsEntry_LargeFile
+ mustCleanupLargeFile := e.FsEntry.Content.GetLargeFile() != nil
+ if mustCleanupLargeFile {
+ oldLargeFile = *e.FsEntry.Content.GetLargeFile()
+ }
+
+ var err error
+ if size >= LargeFileThreshold {
+ err = e.writeLargeFile(ctx, s3, r, size)
+ } else {
+ err = e.writeFile(r, size)
+ }
+ if err != nil {
+ return err
+ }
+
+ if mustCleanupLargeFile {
+ log.Default().Infof("mustCleanupLargeFile: %+v", oldLargeFile)
+
+ err = e.cleanupOldLargeFile(ctx, s3, &oldLargeFile)
+ if err != nil {
+ log.Default().Alert(
+ "error while cleaning up old large file with key %q: %v",
+ oldLargeFile.Key,
+ err)
+ }
+ }
+
+ return nil
+}
+
+func (e *FsEntry) DeleteContents(ctx context.Context, s3 *minio.Client) error {
+ switch {
+ case e.FsEntry.Content.GetDirectory() != nil:
+ return ErrDirectory
+ case e.FsEntry.Content.GetFile() != nil:
+ e.FsEntry.Content = nil
+ return nil
+ case e.FsEntry.Content.GetLargeFile() != nil:
+ return e.cleanupOldLargeFile(ctx, s3, e.Content.GetLargeFile())
+ default:
+ return errors.New("file content not populated with any known type")
+ }
+}
+
+func (e *FsEntry) readFile() (io.Reader, error) {
+ file := e.FsEntry.Content.GetFile()
+ if file == nil {
+ return nil, errors.New("internal error: readFile() called but file is unpopulated")
+ }
+
+ switch file.Compression {
+ case ephs_proto.FsEntry_File_UNCOMPRESSED:
+ return bytes.NewBuffer(file.GetContent()), nil
+ case ephs_proto.FsEntry_File_GZIP:
+ return gzip.NewReader(bytes.NewBuffer(file.GetContent()))
+ default:
+ return nil, ErrUnknownCompression
+ }
+}
+
+func (e *FsEntry) readLargeFile(ctx context.Context, s3 *minio.Client) (io.Reader, error) {
+ if e.Content == nil || e.Content.GetLargeFile() == nil {
+ return nil, errors.New("not a large file")
+ }
+
+ key := e.Content.GetLargeFile().GetKey()
+ if !RegexpLargeFileKey.MatchString(key) {
+ return nil, errors.New("large file key contains invalid characters or is the wrong length")
+ }
+
+ return s3.GetObject(ctx, s3Bucket, s3Prefix+key, minio.GetObjectOptions{})
+}
+
+func (e *FsEntry) writeFile(r io.Reader, size uint64) error {
+ var compression = ephs_proto.FsEntry_File_GZIP
+
+ if f := e.FsEntry.Content.GetFile(); f != nil {
+ compression = f.Compression
+ }
+
+ f := &ephs_proto.FsEntry_File{
+ Compression: compression,
+ }
+
+ switch compression {
+ case ephs_proto.FsEntry_File_UNCOMPRESSED:
+ var err error
+ f.Content, err = io.ReadAll(r)
+ if err != nil {
+ return err
+ }
+ if uint64(len(f.Content)) != size {
+ return fmt.Errorf("underrun writing file: expected %d bytes, wrote %d", size, len(f.Content))
+ }
+ case ephs_proto.FsEntry_File_GZIP:
+ buf := &bytes.Buffer{}
+ gz := gzip.NewWriter(buf)
+ nw, _ := io.Copy(gz, r)
+ gz.Close()
+ f.Content = append(f.Content, buf.Bytes()...)
+
+ if nw != int64(size) {
+ return fmt.Errorf("underrun writing file: expected %d bytes, wrote %d", size, nw)
+ }
+ }
+
+ e.FsEntry.Content = &ephs_proto.FsEntry_Content{
+ Content: &ephs_proto.FsEntry_Content_File{
+ File: f,
+ },
+ }
+
+ return nil
+}
+
+func (e *FsEntry) writeLargeFile(ctx context.Context, s3 *minio.Client, r io.Reader, size uint64) error {
+ key := newLargeFileKey()
+
+ log.Default().Noticef("attempting to PutObject into bucket %s at key %s", s3Bucket, key)
+ upload, err := s3.PutObject(ctx, s3Bucket, s3Prefix+key, r, int64(size), minio.PutObjectOptions{})
+ if err != nil {
+ log.Default().Error(err)
+ return err
+ }
+
+ log.Default().Infof("upload info: %+v", upload)
+
+ e.FsEntry.Content = &ephs_proto.FsEntry_Content{
+ Content: &ephs_proto.FsEntry_Content_LargeFile{
+ LargeFile: &ephs_proto.FsEntry_LargeFile{
+ Key: key,
+ },
+ },
+ }
+
+ return nil
+}
+
+func (e *FsEntry) cleanupOldLargeFile(ctx context.Context, s3 *minio.Client, lf *ephs_proto.FsEntry_LargeFile) error {
+ key := lf.GetKey()
+ if !RegexpLargeFileKey.MatchString(key) {
+ return errors.New("large file key contains invalid characters or is the wrong length")
+ }
+
+ return s3.RemoveObject(ctx, s3Bucket, s3Prefix+key, minio.RemoveObjectOptions{})
+}
+
+func newLargeFileKey() string {
+ const charset = `abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-`
+ var out []byte
+ for i := 0; i < 64; i++ {
+ out = append(out, charset[rand.Intn(len(charset))])
+ }
+ return string(out)
+}
--- /dev/null
+package servicer
+
+import (
+ "errors"
+ "flag"
+
+ "github.com/minio/minio-go/v7"
+ "github.com/minio/minio-go/v7/pkg/credentials"
+ "go.fuhry.dev/runtime/constants"
+)
+
+var s3Endpoint = "s3." + constants.WebServicesDomain
+var s3Bucket = "ephs"
+var s3Prefix = ""
+
+func WithAWSEnvCredentials() Option {
+ return &genericOption{
+ apply: func(s *ephsServicer) error {
+ s.s3Creds = credentials.NewEnvAWS()
+ return nil
+ },
+ }
+}
+
+func WithAWSCredentialFile(filename string) Option {
+ return &genericOption{
+ apply: func(s *ephsServicer) error {
+ s.s3Creds = credentials.NewFileAWSCredentials(filename, "default")
+ return nil
+ },
+ }
+}
+
+func (s *ephsServicer) newS3Client() (*minio.Client, error) {
+ if !flag.Parsed() {
+ return nil, errors.New("flags not yet parsed")
+ }
+
+ return minio.New(s3Endpoint, &minio.Options{
+ Creds: s.s3Creds,
+ Secure: true,
+ })
+}
+
+func init() {
+ flag.StringVar(&s3Endpoint, "ephs.s3-endpoint", s3Endpoint, "S3 endpoint")
+ flag.StringVar(&s3Bucket, "ephs.s3-bucket", s3Bucket, "S3 bucket name")
+ flag.StringVar(&s3Prefix, "ephs.s3-prefix", s3Prefix, "S3 object prefix")
+}
--- /dev/null
+package servicer
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+ "path"
+ "slices"
+ "strings"
+ "sync"
+
+ "github.com/minio/minio-go/v7"
+ "github.com/minio/minio-go/v7/pkg/credentials"
+ etcd_client "go.etcd.io/etcd/client/v3"
+ etcd_concurrency "go.etcd.io/etcd/client/v3/concurrency"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/peer"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/emptypb"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ "go.fuhry.dev/runtime/ephs"
+ "go.fuhry.dev/runtime/grpc"
+ "go.fuhry.dev/runtime/mtls"
+ "go.fuhry.dev/runtime/mtls/fsnotify"
+ ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
+ "go.fuhry.dev/runtime/sd"
+ "go.fuhry.dev/runtime/utils/hostname"
+ "go.fuhry.dev/runtime/utils/log"
+)
+
+type Option interface {
+ Apply(*ephsServicer) error
+}
+
+type genericOption struct {
+ apply func(*ephsServicer) error
+}
+
+type msgWithPath interface {
+ GetPath() string
+}
+
+type ephsServicer struct {
+ ephs_proto.EphsServer
+
+ acl *Acl
+ logger log.Logger
+ clientMu sync.Mutex
+ clients map[string]*etcd_client.Client
+ s3Creds *credentials.Credentials
+ s3Client *minio.Client
+}
+
+var cell string
+
+func (o *genericOption) Apply(servicer *ephsServicer) error {
+ return o.apply(servicer)
+}
+
+func WithAcl(acl *Acl) Option {
+ return &genericOption{
+ apply: func(s *ephsServicer) error {
+ s.acl = acl
+ return nil
+ },
+ }
+}
+
+func (s *ephsServicer) watchAcl(aclFile string) {
+ handle := func(filePath string, op fsnotify.Op) {
+ if filePath != aclFile {
+ return
+ }
+ if acl, err := LoadAcl(filePath); err == nil {
+ s.acl = acl
+ s.logger.Noticef("reloaded ACLs from %s", filePath)
+ } else {
+ s.logger.Warningf("error reloading ACLs from %s, rules not reloaded: %v", filePath, err)
+ }
+ }
+ fsnotify.NotifyPath(aclFile, handle)
+}
+
+func WithAclFile(aclFile string) Option {
+ return &genericOption{
+ apply: func(s *ephsServicer) error {
+ acl, err := LoadAcl(aclFile)
+ if err != nil {
+ return err
+ }
+ s.logger.Noticef("loaded %d ACL rules from %s",
+ len(acl.Rules),
+ aclFile)
+ s.acl = acl
+ s.watchAcl(aclFile)
+ return nil
+ },
+ }
+}
+
+func NewEphsServicer(opts ...Option) (ephs_proto.EphsServer, error) {
+ serv := &ephsServicer{
+ clients: make(map[string]*etcd_client.Client),
+ logger: log.Default().WithPrefix("ephsServicer"),
+ }
+ if _, err := serv.clientForCell(cell); err != nil {
+ return nil, err
+ }
+ for _, o := range opts {
+ if err := o.Apply(serv); err != nil {
+ return nil, err
+ }
+ }
+
+ s3, err := serv.newS3Client()
+ if err != nil {
+ return nil, err
+ }
+ serv.s3Client = s3
+
+ return serv, nil
+}
+
+func (s *ephsServicer) clientForCell(cell string) (*etcd_client.Client, error) {
+ s.clientMu.Lock()
+ defer s.clientMu.Unlock()
+
+ if _, ok := s.clients[cell]; !ok {
+ client, err := sd.NewEtcdClient(
+ mtls.DefaultIdentity(),
+ cell,
+ )
+ if err != nil {
+ return nil, err
+ }
+ s.clients[cell] = client
+ }
+
+ return s.clients[cell], nil
+}
+
+func (s *ephsServicer) checkPath(ctx context.Context, path string) (string, error) {
+ if !ephs.IsEphsPath(path) {
+ return "", status.Errorf(codes.InvalidArgument, "path %q must start with "+ephs.KeyPrefix, path)
+ }
+
+ parts := strings.Split(path, "/")
+
+ if len(parts) < 3 {
+ return "", status.Errorf(
+ codes.InvalidArgument,
+ "path must be at least 3 levels deep: %q",
+ path)
+ }
+
+ for i, part := range parts {
+ if (i > 0 && part == "") || part == "." || part == ".." {
+ return "", status.Errorf(codes.InvalidArgument,
+ "path %q is invalid: may not contain empty elements, '.' or '..'", path)
+ }
+ }
+
+ ident, err := identityFromContext(ctx)
+ if err != nil {
+ return "", err
+ }
+
+ aclPath := strings.Join(parts[2:], "/")
+
+ if s.acl != nil {
+ if !s.acl.Check(ident, aclPath) {
+ return "", status.Errorf(
+ codes.PermissionDenied,
+ "access to path %q denied for %q",
+ aclPath,
+ ident)
+ }
+ }
+
+ if parts[2] == "local" {
+ parts[2] = cell
+ }
+
+ return strings.Join(parts, "/"), nil
+}
+
+func (s *ephsServicer) Stat(ctx context.Context, req *ephs_proto.GetRequest) (*ephs_proto.StatResponse, error) {
+ s.logRequest(ctx, "Stat", req)
+ entry, err := s.getPath(ctx, req.Path)
+ if err != nil {
+ return nil, err
+ }
+
+ return &ephs_proto.StatResponse{
+ Entry: entry,
+ }, nil
+}
+
+func (s *ephsServicer) getPath(ctx context.Context, path string) (*ephs_proto.FsEntry, error) {
+ var err error
+ if path, err = s.checkPath(ctx, path); err != nil {
+ return nil, err
+ }
+
+ cell := strings.Split(path, "/")[2]
+ etcd, err := s.clientForCell(cell)
+ if err != nil {
+ return nil, err
+ }
+
+ obj, err := etcd.Get(ctx, path)
+ if err != nil {
+ return nil, err
+ }
+
+ if len(obj.Kvs) < 1 {
+ return nil, status.Errorf(
+ codes.NotFound,
+ "object does not exist: %q",
+ path)
+ }
+
+ entry := &ephs_proto.FsEntry{}
+ if err := proto.Unmarshal(obj.Kvs[0].Value, entry); err != nil {
+ return nil, status.Errorf(
+ codes.DataLoss,
+ "failed to unmarshal key %q as %s: %v",
+ obj.Kvs[0].Key,
+ proto.MessageName(entry),
+ err)
+ }
+
+ return entry, nil
+}
+
+func (s *ephsServicer) modifyDirectory(ctx context.Context, path string, apply func(*ephs_proto.FsEntry_Directory, etcd_concurrency.STM) error) error {
+ var err error
+ if path, err = s.checkPath(ctx, path); err != nil {
+ return err
+ }
+
+ cell := strings.Split(path, "/")[2]
+ etcd, err := s.clientForCell(cell)
+ if err != nil {
+ return err
+ }
+
+ txnResp, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error {
+ dirRaw := []byte(stm.Get(path))
+ dir := newDir(ctx)
+ if len(dirRaw) > 0 {
+ if err := proto.Unmarshal(dirRaw, dir); err != nil {
+ return fmt.Errorf("error unmarshaling FsEntry at %q: %v", path, err)
+ }
+ }
+
+ if dir.Content.GetDirectory() == nil {
+ return fmt.Errorf("ephs path %q: not a directory", path)
+ }
+
+ if err = apply(dir.Content.GetDirectory(), stm); err != nil {
+ return fmt.Errorf("error performing atomic directory modification: %v", err)
+ }
+
+ dir.Modified = timestamppb.Now()
+ dir.Version += 1
+
+ if dirRaw, err = proto.Marshal(dir); err != nil {
+ return fmt.Errorf("error re-marshaling FsEntry at %q after calling apply: %v", path, err)
+ }
+
+ stm.Put(path, string(dirRaw))
+
+ return nil
+ })
+
+ s.logger.Debugf("transaction status modifying directory %q: %+v", path, txnResp)
+
+ return err
+}
+
+func (s *ephsServicer) logRequest(ctx context.Context, method string, req msgWithPath) {
+ id, err := identityFromContext(ctx)
+ if err != nil {
+ return
+ }
+
+ s.logger.AppendPrefix("]["+id).Infof("%s(%q)", method, req.GetPath())
+}
+
+func (s *ephsServicer) Get(req *ephs_proto.GetRequest, server ephs_proto.Ephs_GetServer) error {
+ ctx := server.Context()
+ s.logRequest(ctx, "Get", req)
+
+ entry, err := s.getPath(ctx, req.Path)
+ if err != nil {
+ return err
+ }
+
+ if entry.Content.GetDirectory() != nil {
+ return status.Errorf(
+ codes.FailedPrecondition,
+ "%q: is a directory",
+ req.Path)
+ }
+
+ entryCopy := *entry
+
+ response := &ephs_proto.GetResponse{
+ Entry: &entryCopy,
+ }
+
+ response.Entry.Content = nil
+
+ ent := &FsEntry{entry}
+ reader, err := ent.GetContents(ctx, s.s3Client)
+ if err != nil {
+ return err
+ }
+
+ writer := &responseWriter{server: server, response: response}
+ n, err := io.Copy(writer, reader)
+ if err != nil {
+ s.logger.Error(err)
+ return err
+ }
+ s.logger.V(1).Noticef("Get(%q): wrote %d bytes", req.Path, n)
+
+ return nil
+}
+
+func (s *ephsServicer) mkdir(ctx context.Context, pathName string, recursive bool) error {
+ parts := strings.Split(pathName, "/")
+
+ cell := parts[2]
+ etcd, err := s.clientForCell(cell)
+ if err != nil {
+ return err
+ }
+
+ txn, err := etcd_concurrency.NewSTM(etcd, func(stm etcd_concurrency.STM) error {
+ var (
+ dir *ephs_proto.FsEntry = newDir(ctx)
+ err error
+ )
+ for i := 3; i < len(parts); i++ {
+ parent := strings.Join(parts[:i], "/")
+
+ dirRaw := []byte(stm.Get(parent))
+ if len(dirRaw) == 0 {
+ if !recursive {
+ return status.Errorf(codes.NotFound, "cannot mkdir %q: parent directory %q does not exist and recursion is disabled", pathName, parent)
+ }
+
+ dir = newDir(ctx)
+ } else {
+ err = proto.Unmarshal(dirRaw, dir)
+ if err != nil {
+ return status.Errorf(codes.Internal,
+ "error unmarshaling parent directory %q: %T: %v",
+ parent, err, err)
+ }
+
+ if dir.Content.GetDirectory() == nil {
+ return status.Errorf(
+ codes.FailedPrecondition,
+ "cannot mkdir %q: parent item %q is not a directory (raw len: %d)",
+ pathName, parent, len(dirRaw))
+ }
+
+ dir.Modified = timestamppb.Now()
+ dir.Version += 1
+ }
+
+ dEnt := dir.Content.GetDirectory()
+
+ dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{
+ Name: parts[i],
+ Directory: true,
+ })
+
+ dirRaw, err = proto.Marshal(dir)
+ if err != nil {
+ return status.Errorf(codes.Internal,
+ "error marshaling parent directory %q: %T: %v",
+ parent, err, err)
+ }
+
+ stm.Put(parent, string(dirRaw))
+ }
+
+ entry := []byte(stm.Get(pathName))
+ if len(entry) > 0 {
+ return status.Errorf(codes.AlreadyExists,
+ "cannot create directory %q: already exists",
+ pathName)
+ }
+
+ dirRaw, err := proto.Marshal(newDir(ctx))
+ if err != nil {
+ return status.Errorf(codes.Internal,
+ "error marshaling new directory %q: %T: %v",
+ pathName, err, err)
+ }
+
+ stm.Put(pathName, string(dirRaw))
+ return nil
+ })
+ if err != nil {
+ return err
+ }
+ if !txn.Succeeded {
+ return fmt.Errorf("transaction failed: %+v", txn)
+ }
+
+ return nil
+}
+
+func (s *ephsServicer) MkDir(ctx context.Context, req *ephs_proto.MkDirRequest) (*emptypb.Empty, error) {
+ path, err := s.checkPath(ctx, req.Path)
+ if err != nil {
+ return nil, err
+ }
+
+ return &emptypb.Empty{}, s.mkdir(ctx, path, req.Recursive)
+}
+
+func (s *ephsServicer) Delete(ctx context.Context, req *ephs_proto.DeleteRequest) (*emptypb.Empty, error) {
+ s.logRequest(ctx, "Delete", req)
+ targetPath, err := s.checkPath(ctx, req.Path)
+ if err != nil {
+ return nil, err
+ }
+
+ entry, err := s.getPath(ctx, targetPath)
+ if err != nil {
+ return nil, err
+ }
+
+ fse := &FsEntry{entry}
+ if fse.Content.GetDirectory() != nil {
+ if len(fse.Content.GetDirectory().Entries) > 0 && !req.Recursive {
+ return nil, status.Errorf(codes.FailedPrecondition,
+ "refusing to delete non-empty directory %q without recursion flag set",
+ targetPath)
+ }
+
+ for _, ent := range fse.Content.GetDirectory().Entries {
+ subReq := &ephs_proto.DeleteRequest{
+ Path: path.Join(targetPath, ent.Name),
+ Recursive: true,
+ }
+ _, err = s.Delete(ctx, subReq)
+ if err != nil {
+ return nil, err
+ }
+ }
+ } else {
+ if err := fse.DeleteContents(ctx, s.s3Client); err != nil {
+ return nil, err
+ }
+ }
+
+ dir := path.Dir(targetPath)
+ err = s.modifyDirectory(ctx, dir, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error {
+ index := -1
+ for i, ent := range dEnt.Entries {
+ if ent.Name == path.Base(targetPath) {
+ index = i
+ break
+ }
+ }
+ if index >= 0 {
+ dEnt.Entries = slices.Delete(dEnt.Entries, index, index+1)
+ }
+ stm.Del(targetPath)
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ return &emptypb.Empty{}, nil
+}
+
+func (s *ephsServicer) Put(server ephs_proto.Ephs_PutServer) error {
+ msg, err := server.Recv()
+ if err != nil {
+ return err
+ }
+
+ ctx := server.Context()
+ s.logRequest(ctx, "Put", msg)
+
+ peerId, err := identityFromContext(ctx)
+ if err != nil {
+ return err
+ }
+
+ targetPath, err := s.checkPath(ctx, msg.Path)
+ if err != nil {
+ return err
+ }
+
+ dir := path.Dir(targetPath)
+ _, err = s.getPath(ctx, dir)
+ if err != nil {
+ if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
+ return status.Errorf(
+ codes.FailedPrecondition,
+ "cannot write %q: parent directory %q does not exist",
+ targetPath, dir)
+ }
+ return err
+ }
+
+ pbEnt, err := s.getPath(ctx, targetPath)
+ if err != nil {
+ if status, ok := status.FromError(err); ok && status.Code() == codes.NotFound {
+ // NotFound is only returned after all access checks have succeeded. Ok to
+ // create file.
+ now := timestamppb.Now()
+ pbEnt = &ephs_proto.FsEntry{
+ Created: now,
+ Modified: now,
+ Version: msg.Version - 1,
+ Size: msg.Size,
+ Owner: peerId,
+ }
+ } else {
+ // some other error besides not-found
+ return err
+ }
+ } else {
+ if msg.Version != 0 && msg.Version != (pbEnt.Version+1) {
+ return status.Errorf(
+ codes.FailedPrecondition,
+ "version conflict: attempted to overwrite version %d with %d, "+
+ "version number must be current version + 1",
+ pbEnt.Version,
+ msg.Version)
+ }
+ }
+ ent := &FsEntry{pbEnt}
+ if pbEnt.Content.GetDirectory() != nil {
+ return status.Errorf(codes.FailedPrecondition,
+ "cannot write %q: path already exists and is a directory",
+ targetPath)
+ }
+
+ reader := &putReader{
+ initialMsg: msg,
+ server: server,
+ buf: &bytes.Buffer{},
+ }
+
+ if err := ent.SetContents(ctx, s.s3Client, reader, msg.Size); err != nil {
+ return err
+ }
+
+ ent.FsEntry.Modified = timestamppb.Now()
+ ent.FsEntry.Size = msg.Size
+ ent.FsEntry.Version++
+
+ marshaled, err := proto.Marshal(ent.FsEntry)
+ if err != nil {
+ return err
+ }
+
+ err = s.modifyDirectory(ctx, dir, func(dEnt *ephs_proto.FsEntry_Directory, stm etcd_concurrency.STM) error {
+ stm.Put(targetPath, string(marshaled))
+ base := path.Base(targetPath)
+ for _, ent := range dEnt.Entries {
+ if ent.Name == base {
+ return nil
+ }
+ }
+
+ dEnt.Entries = append(dEnt.Entries, &ephs_proto.FsEntry_Directory_DirectoryEntry{
+ Name: base,
+ Directory: false,
+ })
+
+ return nil
+ })
+
+ if err != nil {
+ _ = ent.DeleteContents(ctx, s.s3Client)
+ return err
+ }
+
+ response := &ephs_proto.StatResponse{
+ Entry: ent.FsEntry,
+ }
+ return server.SendAndClose(response)
+}
+
+func (s *ephsServicer) Watch(req *ephs_proto.GetRequest, server ephs_proto.Ephs_WatchServer) error {
+ ctx := etcd_client.WithRequireLeader(server.Context())
+ s.logRequest(ctx, "Watch", req)
+ path, err := s.checkPath(ctx, req.Path)
+ if err != nil {
+ return err
+ }
+
+ response := &ephs_proto.WatchResponse{
+ Entry: nil,
+ Event: ephs_proto.WatchResponse_DELETE,
+ }
+
+ if resp, err := s.getPath(ctx, path); err == nil {
+ response.Entry = resp
+ response.Event = ephs_proto.WatchResponse_CREATE
+ }
+
+ cell := strings.Split(path, "/")[2]
+ etcd, err := s.clientForCell(cell)
+ if err != nil {
+ return err
+ }
+
+ if err := server.Send(response); err != nil {
+ return err
+ }
+
+ for ctx.Err() == nil {
+ watcher := etcd.Watch(ctx, path)
+ for msg := range watcher {
+ s.logger.Debugf("watcher: got event: %+v", msg)
+ for _, event := range msg.Events {
+ if string(event.Kv.Key) != path {
+ continue
+ }
+
+ response.Entry = &ephs_proto.FsEntry{}
+
+ if event.IsCreate() {
+ response.Event = ephs_proto.WatchResponse_CREATE
+ } else if event.IsModify() {
+ response.Event = ephs_proto.WatchResponse_MODIFY
+ } else if event.Type == etcd_client.EventTypeDelete {
+ response.Entry = nil
+ response.Event = ephs_proto.WatchResponse_DELETE
+ } else {
+ continue
+ }
+
+ if response.Entry != nil {
+ if err = proto.Unmarshal(event.Kv.Value, response.Entry); err != nil {
+ s.logger.Errorf("protobuf marshal error: %v", err)
+ return err
+ }
+ }
+
+ if err := server.Send(response); err != nil {
+ s.logger.Errorf("stream send error: %v", err)
+ return err
+ }
+ }
+ }
+ }
+
+ s.logger.Noticef("Watch(%q) ending: ctx err: %v", path, ctx.Err())
+
+ if errors.Is(ctx.Err(), context.Canceled) {
+ return status.Error(codes.Unavailable, "server is shutting down, please reconnect")
+ }
+
+ return nil
+}
+
+func identityFromContext(ctx context.Context) (string, error) {
+ peer, ok := peer.FromContext(ctx)
+ if !ok {
+ return "", status.Error(codes.Unauthenticated, "who are you??")
+ }
+
+ ident, err := grpc.PeerIdentity(peer)
+ if err != nil {
+ return "", status.Errorf(codes.PermissionDenied, "cannot determine your identity from peer info: %v", peer.AuthInfo)
+ }
+
+ return ident.String(), nil
+}
+
+func newDir(ctx context.Context) *ephs_proto.FsEntry {
+ whoami, _ := identityFromContext(ctx)
+
+ return &ephs_proto.FsEntry{
+ Content: &ephs_proto.FsEntry_Content{
+ Content: &ephs_proto.FsEntry_Content_Directory{
+ Directory: &ephs_proto.FsEntry_Directory{},
+ },
+ },
+ Owner: whoami,
+ Created: timestamppb.Now(),
+ Modified: timestamppb.Now(),
+ Version: 1,
+ }
+}
+
+func init() {
+ flag.StringVar(&cell, "ephs.cell", hostname.DomainName(), "ephs cell")
+}
--- /dev/null
+package servicer
+
+import (
+ "bytes"
+ "fmt"
+
+ ephs_proto "go.fuhry.dev/runtime/proto/service/ephs"
+ "go.fuhry.dev/runtime/utils/log"
+)
+
+type responseWriter struct {
+ server ephs_proto.Ephs_GetServer
+ response *ephs_proto.GetResponse
+}
+
+func (w *responseWriter) Write(p []byte) (int, error) {
+ log.Default().V(3).Debugf("responseWriter: writing %d bytes", len(p))
+ chunkLen := min(len(p), LargeFileThreshold)
+
+ w.response.Chunk = make([]byte, chunkLen)
+ copy(w.response.Chunk, p)
+
+ if err := w.server.Send(w.response); err != nil {
+ log.Default().Error(err)
+ return 0, err
+ }
+
+ log.Default().V(2).Noticef("wrote %d bytes", chunkLen)
+ return chunkLen, nil
+}
+
+type putReader struct {
+ initialMsg *ephs_proto.PutRequest
+ server ephs_proto.Ephs_PutServer
+ buf *bytes.Buffer
+}
+
+func (r *putReader) Read(p []byte) (int, error) {
+ var msg *ephs_proto.PutRequest
+ if r.buf.Len() > 0 {
+ n, err := r.buf.Read(p)
+ return n, err
+ } else if r.initialMsg != nil {
+ msg = r.initialMsg
+ r.initialMsg = nil
+ } else {
+ var err error
+ msg, err = r.server.Recv()
+ if err != nil {
+ return 0, err
+ }
+ }
+
+ nw, err := r.buf.Write(msg.Chunk)
+ if err != nil {
+ return 0, err
+ }
+ if nw != len(msg.Chunk) {
+ return 0, fmt.Errorf(
+ "overrun on bytes.Buffer putReader: expected to write %d bytes, "+
+ "but only wrote %d",
+ len(r.initialMsg.Chunk),
+ nw)
+ }
+
+ return r.buf.Read(p)
+}
--- /dev/null
+PROTO_SRCS := $(wildcard *.proto)
+PROTO_GO_OUTPUT := $(PROTO_SRCS:.proto=.pb.go) $(PROTO_SRCS:.proto=_grpc.pb.go)
+
+$(PROTO_GO_OUTPUT): $(PROTO_SRCS)
+ protoc --go_out=. --go_opt=paths=source_relative \
+ --go-grpc_out=. --go-grpc_opt=paths=source_relative \
+ $(PROTO_SRCS)
+
+pb_go: $(PROTO_GO_OUTPUT)
+
+all: pb_go
+
+clean:
+ rm -fv $(PROTO_GO_OUTPUT)
--- /dev/null
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.36.10
+// protoc v6.32.1
+// source: service.proto
+
+package ephs
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ emptypb "google.golang.org/protobuf/types/known/emptypb"
+ reflect "reflect"
+ sync "sync"
+ unsafe "unsafe"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type WatchResponse_Event int32
+
+const (
+ WatchResponse_UNKNOWN WatchResponse_Event = 0
+ WatchResponse_CREATE WatchResponse_Event = 1
+ WatchResponse_MODIFY WatchResponse_Event = 2
+ WatchResponse_DELETE WatchResponse_Event = 3
+)
+
+// Enum value maps for WatchResponse_Event.
+var (
+ WatchResponse_Event_name = map[int32]string{
+ 0: "UNKNOWN",
+ 1: "CREATE",
+ 2: "MODIFY",
+ 3: "DELETE",
+ }
+ WatchResponse_Event_value = map[string]int32{
+ "UNKNOWN": 0,
+ "CREATE": 1,
+ "MODIFY": 2,
+ "DELETE": 3,
+ }
+)
+
+func (x WatchResponse_Event) Enum() *WatchResponse_Event {
+ p := new(WatchResponse_Event)
+ *p = x
+ return p
+}
+
+func (x WatchResponse_Event) String() string {
+ return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (WatchResponse_Event) Descriptor() protoreflect.EnumDescriptor {
+ return file_service_proto_enumTypes[0].Descriptor()
+}
+
+func (WatchResponse_Event) Type() protoreflect.EnumType {
+ return &file_service_proto_enumTypes[0]
+}
+
+func (x WatchResponse_Event) Number() protoreflect.EnumNumber {
+ return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use WatchResponse_Event.Descriptor instead.
+func (WatchResponse_Event) EnumDescriptor() ([]byte, []int) {
+ return file_service_proto_rawDescGZIP(), []int{6, 0}
+}
+
+type GetRequest struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *GetRequest) Reset() {
+ *x = GetRequest{}
+ mi := &file_service_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *GetRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*GetRequest) ProtoMessage() {}
+
+func (x *GetRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_service_proto_msgTypes[0]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use GetRequest.ProtoReflect.Descriptor instead.
+func (*GetRequest) Descriptor() ([]byte, []int) {
+ return file_service_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *GetRequest) GetPath() string {
+ if x != nil {
+ return x.Path
+ }
+ return ""
+}
+
+type DeleteRequest struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
+ Recursive bool `protobuf:"varint,2,opt,name=recursive,proto3" json:"recursive,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *DeleteRequest) Reset() {
+ *x = DeleteRequest{}
+ mi := &file_service_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *DeleteRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*DeleteRequest) ProtoMessage() {}
+
+func (x *DeleteRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_service_proto_msgTypes[1]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use DeleteRequest.ProtoReflect.Descriptor instead.
+func (*DeleteRequest) Descriptor() ([]byte, []int) {
+ return file_service_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *DeleteRequest) GetPath() string {
+ if x != nil {
+ return x.Path
+ }
+ return ""
+}
+
+func (x *DeleteRequest) GetRecursive() bool {
+ if x != nil {
+ return x.Recursive
+ }
+ return false
+}
+
+type MkDirRequest struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
+ Recursive bool `protobuf:"varint,2,opt,name=recursive,proto3" json:"recursive,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *MkDirRequest) Reset() {
+ *x = MkDirRequest{}
+ mi := &file_service_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *MkDirRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*MkDirRequest) ProtoMessage() {}
+
+func (x *MkDirRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_service_proto_msgTypes[2]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use MkDirRequest.ProtoReflect.Descriptor instead.
+func (*MkDirRequest) Descriptor() ([]byte, []int) {
+ return file_service_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *MkDirRequest) GetPath() string {
+ if x != nil {
+ return x.Path
+ }
+ return ""
+}
+
+func (x *MkDirRequest) GetRecursive() bool {
+ if x != nil {
+ return x.Recursive
+ }
+ return false
+}
+
+type GetResponse struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Entry *FsEntry `protobuf:"bytes,1,opt,name=entry,proto3" json:"entry,omitempty"`
+ Chunk []byte `protobuf:"bytes,2,opt,name=chunk,proto3" json:"chunk,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *GetResponse) Reset() {
+ *x = GetResponse{}
+ mi := &file_service_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *GetResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*GetResponse) ProtoMessage() {}
+
+func (x *GetResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_service_proto_msgTypes[3]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use GetResponse.ProtoReflect.Descriptor instead.
+func (*GetResponse) Descriptor() ([]byte, []int) {
+ return file_service_proto_rawDescGZIP(), []int{3}
+}
+
+func (x *GetResponse) GetEntry() *FsEntry {
+ if x != nil {
+ return x.Entry
+ }
+ return nil
+}
+
+func (x *GetResponse) GetChunk() []byte {
+ if x != nil {
+ return x.Chunk
+ }
+ return nil
+}
+
+type StatResponse struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Entry *FsEntry `protobuf:"bytes,1,opt,name=entry,proto3" json:"entry,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *StatResponse) Reset() {
+ *x = StatResponse{}
+ mi := &file_service_proto_msgTypes[4]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *StatResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*StatResponse) ProtoMessage() {}
+
+func (x *StatResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_service_proto_msgTypes[4]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use StatResponse.ProtoReflect.Descriptor instead.
+func (*StatResponse) Descriptor() ([]byte, []int) {
+ return file_service_proto_rawDescGZIP(), []int{4}
+}
+
+func (x *StatResponse) GetEntry() *FsEntry {
+ if x != nil {
+ return x.Entry
+ }
+ return nil
+}
+
+type PutRequest struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
+ Version uint64 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty"`
+ Size uint64 `protobuf:"varint,3,opt,name=size,proto3" json:"size,omitempty"`
+ Chunk []byte `protobuf:"bytes,4,opt,name=chunk,proto3" json:"chunk,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *PutRequest) Reset() {
+ *x = PutRequest{}
+ mi := &file_service_proto_msgTypes[5]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *PutRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*PutRequest) ProtoMessage() {}
+
+func (x *PutRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_service_proto_msgTypes[5]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use PutRequest.ProtoReflect.Descriptor instead.
+func (*PutRequest) Descriptor() ([]byte, []int) {
+ return file_service_proto_rawDescGZIP(), []int{5}
+}
+
+func (x *PutRequest) GetPath() string {
+ if x != nil {
+ return x.Path
+ }
+ return ""
+}
+
+func (x *PutRequest) GetVersion() uint64 {
+ if x != nil {
+ return x.Version
+ }
+ return 0
+}
+
+func (x *PutRequest) GetSize() uint64 {
+ if x != nil {
+ return x.Size
+ }
+ return 0
+}
+
+func (x *PutRequest) GetChunk() []byte {
+ if x != nil {
+ return x.Chunk
+ }
+ return nil
+}
+
+type WatchResponse struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Entry *FsEntry `protobuf:"bytes,1,opt,name=entry,proto3" json:"entry,omitempty"`
+ Event WatchResponse_Event `protobuf:"varint,2,opt,name=event,proto3,enum=fuhry.runtime.service.ephs.WatchResponse_Event" json:"event,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *WatchResponse) Reset() {
+ *x = WatchResponse{}
+ mi := &file_service_proto_msgTypes[6]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *WatchResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*WatchResponse) ProtoMessage() {}
+
+func (x *WatchResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_service_proto_msgTypes[6]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use WatchResponse.ProtoReflect.Descriptor instead.
+func (*WatchResponse) Descriptor() ([]byte, []int) {
+ return file_service_proto_rawDescGZIP(), []int{6}
+}
+
+func (x *WatchResponse) GetEntry() *FsEntry {
+ if x != nil {
+ return x.Entry
+ }
+ return nil
+}
+
+func (x *WatchResponse) GetEvent() WatchResponse_Event {
+ if x != nil {
+ return x.Event
+ }
+ return WatchResponse_UNKNOWN
+}
+
+var File_service_proto protoreflect.FileDescriptor
+
+const file_service_proto_rawDesc = "" +
+ "\n" +
+ "\rservice.proto\x12\x1afuhry.runtime.service.ephs\x1a\x1bgoogle/protobuf/empty.proto\x1a\vtypes.proto\" \n" +
+ "\n" +
+ "GetRequest\x12\x12\n" +
+ "\x04path\x18\x01 \x01(\tR\x04path\"A\n" +
+ "\rDeleteRequest\x12\x12\n" +
+ "\x04path\x18\x01 \x01(\tR\x04path\x12\x1c\n" +
+ "\trecursive\x18\x02 \x01(\bR\trecursive\"@\n" +
+ "\fMkDirRequest\x12\x12\n" +
+ "\x04path\x18\x01 \x01(\tR\x04path\x12\x1c\n" +
+ "\trecursive\x18\x02 \x01(\bR\trecursive\"^\n" +
+ "\vGetResponse\x129\n" +
+ "\x05entry\x18\x01 \x01(\v2#.fuhry.runtime.service.ephs.FsEntryR\x05entry\x12\x14\n" +
+ "\x05chunk\x18\x02 \x01(\fR\x05chunk\"I\n" +
+ "\fStatResponse\x129\n" +
+ "\x05entry\x18\x01 \x01(\v2#.fuhry.runtime.service.ephs.FsEntryR\x05entry\"d\n" +
+ "\n" +
+ "PutRequest\x12\x12\n" +
+ "\x04path\x18\x01 \x01(\tR\x04path\x12\x18\n" +
+ "\aversion\x18\x02 \x01(\x04R\aversion\x12\x12\n" +
+ "\x04size\x18\x03 \x01(\x04R\x04size\x12\x14\n" +
+ "\x05chunk\x18\x04 \x01(\fR\x05chunk\"\xcb\x01\n" +
+ "\rWatchResponse\x129\n" +
+ "\x05entry\x18\x01 \x01(\v2#.fuhry.runtime.service.ephs.FsEntryR\x05entry\x12E\n" +
+ "\x05event\x18\x02 \x01(\x0e2/.fuhry.runtime.service.ephs.WatchResponse.EventR\x05event\"8\n" +
+ "\x05Event\x12\v\n" +
+ "\aUNKNOWN\x10\x00\x12\n" +
+ "\n" +
+ "\x06CREATE\x10\x01\x12\n" +
+ "\n" +
+ "\x06MODIFY\x10\x02\x12\n" +
+ "\n" +
+ "\x06DELETE\x10\x032\x97\x04\n" +
+ "\x04Ephs\x12Z\n" +
+ "\x03Get\x12&.fuhry.runtime.service.ephs.GetRequest\x1a'.fuhry.runtime.service.ephs.GetResponse\"\x000\x01\x12[\n" +
+ "\x03Put\x12&.fuhry.runtime.service.ephs.PutRequest\x1a(.fuhry.runtime.service.ephs.StatResponse\"\x00(\x01\x12Z\n" +
+ "\x04Stat\x12&.fuhry.runtime.service.ephs.GetRequest\x1a(.fuhry.runtime.service.ephs.StatResponse\"\x00\x12^\n" +
+ "\x05Watch\x12&.fuhry.runtime.service.ephs.GetRequest\x1a).fuhry.runtime.service.ephs.WatchResponse\"\x000\x01\x12K\n" +
+ "\x05MkDir\x12(.fuhry.runtime.service.ephs.MkDirRequest\x1a\x16.google.protobuf.Empty\"\x00\x12M\n" +
+ "\x06Delete\x12).fuhry.runtime.service.ephs.DeleteRequest\x1a\x16.google.protobuf.Empty\"\x00B)Z'go.fuhry.dev/runtime/proto/service/ephsb\x06proto3"
+
+var (
+ file_service_proto_rawDescOnce sync.Once
+ file_service_proto_rawDescData []byte
+)
+
+func file_service_proto_rawDescGZIP() []byte {
+ file_service_proto_rawDescOnce.Do(func() {
+ file_service_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_service_proto_rawDesc), len(file_service_proto_rawDesc)))
+ })
+ return file_service_proto_rawDescData
+}
+
+var file_service_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
+var file_service_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
+var file_service_proto_goTypes = []any{
+ (WatchResponse_Event)(0), // 0: fuhry.runtime.service.ephs.WatchResponse.Event
+ (*GetRequest)(nil), // 1: fuhry.runtime.service.ephs.GetRequest
+ (*DeleteRequest)(nil), // 2: fuhry.runtime.service.ephs.DeleteRequest
+ (*MkDirRequest)(nil), // 3: fuhry.runtime.service.ephs.MkDirRequest
+ (*GetResponse)(nil), // 4: fuhry.runtime.service.ephs.GetResponse
+ (*StatResponse)(nil), // 5: fuhry.runtime.service.ephs.StatResponse
+ (*PutRequest)(nil), // 6: fuhry.runtime.service.ephs.PutRequest
+ (*WatchResponse)(nil), // 7: fuhry.runtime.service.ephs.WatchResponse
+ (*FsEntry)(nil), // 8: fuhry.runtime.service.ephs.FsEntry
+ (*emptypb.Empty)(nil), // 9: google.protobuf.Empty
+}
+var file_service_proto_depIdxs = []int32{
+ 8, // 0: fuhry.runtime.service.ephs.GetResponse.entry:type_name -> fuhry.runtime.service.ephs.FsEntry
+ 8, // 1: fuhry.runtime.service.ephs.StatResponse.entry:type_name -> fuhry.runtime.service.ephs.FsEntry
+ 8, // 2: fuhry.runtime.service.ephs.WatchResponse.entry:type_name -> fuhry.runtime.service.ephs.FsEntry
+ 0, // 3: fuhry.runtime.service.ephs.WatchResponse.event:type_name -> fuhry.runtime.service.ephs.WatchResponse.Event
+ 1, // 4: fuhry.runtime.service.ephs.Ephs.Get:input_type -> fuhry.runtime.service.ephs.GetRequest
+ 6, // 5: fuhry.runtime.service.ephs.Ephs.Put:input_type -> fuhry.runtime.service.ephs.PutRequest
+ 1, // 6: fuhry.runtime.service.ephs.Ephs.Stat:input_type -> fuhry.runtime.service.ephs.GetRequest
+ 1, // 7: fuhry.runtime.service.ephs.Ephs.Watch:input_type -> fuhry.runtime.service.ephs.GetRequest
+ 3, // 8: fuhry.runtime.service.ephs.Ephs.MkDir:input_type -> fuhry.runtime.service.ephs.MkDirRequest
+ 2, // 9: fuhry.runtime.service.ephs.Ephs.Delete:input_type -> fuhry.runtime.service.ephs.DeleteRequest
+ 4, // 10: fuhry.runtime.service.ephs.Ephs.Get:output_type -> fuhry.runtime.service.ephs.GetResponse
+ 5, // 11: fuhry.runtime.service.ephs.Ephs.Put:output_type -> fuhry.runtime.service.ephs.StatResponse
+ 5, // 12: fuhry.runtime.service.ephs.Ephs.Stat:output_type -> fuhry.runtime.service.ephs.StatResponse
+ 7, // 13: fuhry.runtime.service.ephs.Ephs.Watch:output_type -> fuhry.runtime.service.ephs.WatchResponse
+ 9, // 14: fuhry.runtime.service.ephs.Ephs.MkDir:output_type -> google.protobuf.Empty
+ 9, // 15: fuhry.runtime.service.ephs.Ephs.Delete:output_type -> google.protobuf.Empty
+ 10, // [10:16] is the sub-list for method output_type
+ 4, // [4:10] is the sub-list for method input_type
+ 4, // [4:4] is the sub-list for extension type_name
+ 4, // [4:4] is the sub-list for extension extendee
+ 0, // [0:4] is the sub-list for field type_name
+}
+
+func init() { file_service_proto_init() }
+func file_service_proto_init() {
+ if File_service_proto != nil {
+ return
+ }
+ file_types_proto_init()
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: unsafe.Slice(unsafe.StringData(file_service_proto_rawDesc), len(file_service_proto_rawDesc)),
+ NumEnums: 1,
+ NumMessages: 7,
+ NumExtensions: 0,
+ NumServices: 1,
+ },
+ GoTypes: file_service_proto_goTypes,
+ DependencyIndexes: file_service_proto_depIdxs,
+ EnumInfos: file_service_proto_enumTypes,
+ MessageInfos: file_service_proto_msgTypes,
+ }.Build()
+ File_service_proto = out.File
+ file_service_proto_goTypes = nil
+ file_service_proto_depIdxs = nil
+}
--- /dev/null
+syntax = "proto3";
+import "google/protobuf/empty.proto";
+
+option go_package = "go.fuhry.dev/runtime/proto/service/ephs";
+
+package fuhry.runtime.service.ephs;
+
+message GetRequest {
+ string path = 1;
+}
+
+message DeleteRequest {
+ string path = 1;
+ bool recursive = 2;
+}
+
+message MkDirRequest {
+ string path = 1;
+ bool recursive = 2;
+}
+
+message GetResponse {
+ FsEntry entry = 1;
+ bytes chunk = 2;
+}
+
+message StatResponse {
+ FsEntry entry = 1;
+}
+
+message PutRequest {
+ string path = 1;
+ uint64 version = 2;
+ uint64 size = 3;
+ bytes chunk = 4;
+}
+
+message WatchResponse {
+ enum Event {
+ UNKNOWN = 0;
+ CREATE = 1;
+ MODIFY = 2;
+ DELETE = 3;
+ }
+
+ FsEntry entry = 1;
+ Event event = 2;
+}
+
+service Ephs {
+ rpc Get (GetRequest) returns (stream GetResponse) {}
+ rpc Put (stream PutRequest) returns (StatResponse) {}
+ rpc Stat (GetRequest) returns (StatResponse) {}
+ rpc Watch (GetRequest) returns (stream WatchResponse) {}
+ rpc MkDir (MkDirRequest) returns (google.protobuf.Empty) {}
+ rpc Delete (DeleteRequest) returns (google.protobuf.Empty) {}
+}
--- /dev/null
+// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
+// versions:
+// - protoc-gen-go-grpc v1.3.0
+// - protoc v6.32.1
+// source: service.proto
+
+package ephs
+
+import (
+ context "context"
+ grpc "google.golang.org/grpc"
+ codes "google.golang.org/grpc/codes"
+ status "google.golang.org/grpc/status"
+ emptypb "google.golang.org/protobuf/types/known/emptypb"
+)
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+// Requires gRPC-Go v1.32.0 or later.
+const _ = grpc.SupportPackageIsVersion7
+
+const (
+ Ephs_Get_FullMethodName = "/fuhry.runtime.service.ephs.Ephs/Get"
+ Ephs_Put_FullMethodName = "/fuhry.runtime.service.ephs.Ephs/Put"
+ Ephs_Stat_FullMethodName = "/fuhry.runtime.service.ephs.Ephs/Stat"
+ Ephs_Watch_FullMethodName = "/fuhry.runtime.service.ephs.Ephs/Watch"
+ Ephs_MkDir_FullMethodName = "/fuhry.runtime.service.ephs.Ephs/MkDir"
+ Ephs_Delete_FullMethodName = "/fuhry.runtime.service.ephs.Ephs/Delete"
+)
+
+// EphsClient is the client API for Ephs service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
+type EphsClient interface {
+ Get(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (Ephs_GetClient, error)
+ Put(ctx context.Context, opts ...grpc.CallOption) (Ephs_PutClient, error)
+ Stat(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*StatResponse, error)
+ Watch(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (Ephs_WatchClient, error)
+ MkDir(ctx context.Context, in *MkDirRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
+ Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
+}
+
+type ephsClient struct {
+ cc grpc.ClientConnInterface
+}
+
+func NewEphsClient(cc grpc.ClientConnInterface) EphsClient {
+ return &ephsClient{cc}
+}
+
+func (c *ephsClient) Get(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (Ephs_GetClient, error) {
+ stream, err := c.cc.NewStream(ctx, &Ephs_ServiceDesc.Streams[0], Ephs_Get_FullMethodName, opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &ephsGetClient{stream}
+ if err := x.ClientStream.SendMsg(in); err != nil {
+ return nil, err
+ }
+ if err := x.ClientStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ return x, nil
+}
+
+type Ephs_GetClient interface {
+ Recv() (*GetResponse, error)
+ grpc.ClientStream
+}
+
+type ephsGetClient struct {
+ grpc.ClientStream
+}
+
+func (x *ephsGetClient) Recv() (*GetResponse, error) {
+ m := new(GetResponse)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func (c *ephsClient) Put(ctx context.Context, opts ...grpc.CallOption) (Ephs_PutClient, error) {
+ stream, err := c.cc.NewStream(ctx, &Ephs_ServiceDesc.Streams[1], Ephs_Put_FullMethodName, opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &ephsPutClient{stream}
+ return x, nil
+}
+
+type Ephs_PutClient interface {
+ Send(*PutRequest) error
+ CloseAndRecv() (*StatResponse, error)
+ grpc.ClientStream
+}
+
+type ephsPutClient struct {
+ grpc.ClientStream
+}
+
+func (x *ephsPutClient) Send(m *PutRequest) error {
+ return x.ClientStream.SendMsg(m)
+}
+
+func (x *ephsPutClient) CloseAndRecv() (*StatResponse, error) {
+ if err := x.ClientStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ m := new(StatResponse)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func (c *ephsClient) Stat(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*StatResponse, error) {
+ out := new(StatResponse)
+ err := c.cc.Invoke(ctx, Ephs_Stat_FullMethodName, in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
+func (c *ephsClient) Watch(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (Ephs_WatchClient, error) {
+ stream, err := c.cc.NewStream(ctx, &Ephs_ServiceDesc.Streams[2], Ephs_Watch_FullMethodName, opts...)
+ if err != nil {
+ return nil, err
+ }
+ x := &ephsWatchClient{stream}
+ if err := x.ClientStream.SendMsg(in); err != nil {
+ return nil, err
+ }
+ if err := x.ClientStream.CloseSend(); err != nil {
+ return nil, err
+ }
+ return x, nil
+}
+
+type Ephs_WatchClient interface {
+ Recv() (*WatchResponse, error)
+ grpc.ClientStream
+}
+
+type ephsWatchClient struct {
+ grpc.ClientStream
+}
+
+func (x *ephsWatchClient) Recv() (*WatchResponse, error) {
+ m := new(WatchResponse)
+ if err := x.ClientStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func (c *ephsClient) MkDir(ctx context.Context, in *MkDirRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
+ out := new(emptypb.Empty)
+ err := c.cc.Invoke(ctx, Ephs_MkDir_FullMethodName, in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
+func (c *ephsClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
+ out := new(emptypb.Empty)
+ err := c.cc.Invoke(ctx, Ephs_Delete_FullMethodName, in, out, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return out, nil
+}
+
+// EphsServer is the server API for Ephs service.
+// All implementations must embed UnimplementedEphsServer
+// for forward compatibility
+type EphsServer interface {
+ Get(*GetRequest, Ephs_GetServer) error
+ Put(Ephs_PutServer) error
+ Stat(context.Context, *GetRequest) (*StatResponse, error)
+ Watch(*GetRequest, Ephs_WatchServer) error
+ MkDir(context.Context, *MkDirRequest) (*emptypb.Empty, error)
+ Delete(context.Context, *DeleteRequest) (*emptypb.Empty, error)
+ mustEmbedUnimplementedEphsServer()
+}
+
+// UnimplementedEphsServer must be embedded to have forward compatible implementations.
+type UnimplementedEphsServer struct {
+}
+
+func (UnimplementedEphsServer) Get(*GetRequest, Ephs_GetServer) error {
+ return status.Errorf(codes.Unimplemented, "method Get not implemented")
+}
+func (UnimplementedEphsServer) Put(Ephs_PutServer) error {
+ return status.Errorf(codes.Unimplemented, "method Put not implemented")
+}
+func (UnimplementedEphsServer) Stat(context.Context, *GetRequest) (*StatResponse, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method Stat not implemented")
+}
+func (UnimplementedEphsServer) Watch(*GetRequest, Ephs_WatchServer) error {
+ return status.Errorf(codes.Unimplemented, "method Watch not implemented")
+}
+func (UnimplementedEphsServer) MkDir(context.Context, *MkDirRequest) (*emptypb.Empty, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method MkDir not implemented")
+}
+func (UnimplementedEphsServer) Delete(context.Context, *DeleteRequest) (*emptypb.Empty, error) {
+ return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented")
+}
+func (UnimplementedEphsServer) mustEmbedUnimplementedEphsServer() {}
+
+// UnsafeEphsServer may be embedded to opt out of forward compatibility for this service.
+// Use of this interface is not recommended, as added methods to EphsServer will
+// result in compilation errors.
+type UnsafeEphsServer interface {
+ mustEmbedUnimplementedEphsServer()
+}
+
+func RegisterEphsServer(s grpc.ServiceRegistrar, srv EphsServer) {
+ s.RegisterService(&Ephs_ServiceDesc, srv)
+}
+
+func _Ephs_Get_Handler(srv interface{}, stream grpc.ServerStream) error {
+ m := new(GetRequest)
+ if err := stream.RecvMsg(m); err != nil {
+ return err
+ }
+ return srv.(EphsServer).Get(m, &ephsGetServer{stream})
+}
+
+type Ephs_GetServer interface {
+ Send(*GetResponse) error
+ grpc.ServerStream
+}
+
+type ephsGetServer struct {
+ grpc.ServerStream
+}
+
+func (x *ephsGetServer) Send(m *GetResponse) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+func _Ephs_Put_Handler(srv interface{}, stream grpc.ServerStream) error {
+ return srv.(EphsServer).Put(&ephsPutServer{stream})
+}
+
+type Ephs_PutServer interface {
+ SendAndClose(*StatResponse) error
+ Recv() (*PutRequest, error)
+ grpc.ServerStream
+}
+
+type ephsPutServer struct {
+ grpc.ServerStream
+}
+
+func (x *ephsPutServer) SendAndClose(m *StatResponse) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+func (x *ephsPutServer) Recv() (*PutRequest, error) {
+ m := new(PutRequest)
+ if err := x.ServerStream.RecvMsg(m); err != nil {
+ return nil, err
+ }
+ return m, nil
+}
+
+func _Ephs_Stat_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(GetRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(EphsServer).Stat(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: Ephs_Stat_FullMethodName,
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(EphsServer).Stat(ctx, req.(*GetRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
+func _Ephs_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
+ m := new(GetRequest)
+ if err := stream.RecvMsg(m); err != nil {
+ return err
+ }
+ return srv.(EphsServer).Watch(m, &ephsWatchServer{stream})
+}
+
+type Ephs_WatchServer interface {
+ Send(*WatchResponse) error
+ grpc.ServerStream
+}
+
+type ephsWatchServer struct {
+ grpc.ServerStream
+}
+
+func (x *ephsWatchServer) Send(m *WatchResponse) error {
+ return x.ServerStream.SendMsg(m)
+}
+
+func _Ephs_MkDir_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(MkDirRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(EphsServer).MkDir(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: Ephs_MkDir_FullMethodName,
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(EphsServer).MkDir(ctx, req.(*MkDirRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
+func _Ephs_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+ in := new(DeleteRequest)
+ if err := dec(in); err != nil {
+ return nil, err
+ }
+ if interceptor == nil {
+ return srv.(EphsServer).Delete(ctx, in)
+ }
+ info := &grpc.UnaryServerInfo{
+ Server: srv,
+ FullMethod: Ephs_Delete_FullMethodName,
+ }
+ handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+ return srv.(EphsServer).Delete(ctx, req.(*DeleteRequest))
+ }
+ return interceptor(ctx, in, info, handler)
+}
+
+// Ephs_ServiceDesc is the grpc.ServiceDesc for Ephs service.
+// It's only intended for direct use with grpc.RegisterService,
+// and not to be introspected or modified (even as a copy)
+var Ephs_ServiceDesc = grpc.ServiceDesc{
+ ServiceName: "fuhry.runtime.service.ephs.Ephs",
+ HandlerType: (*EphsServer)(nil),
+ Methods: []grpc.MethodDesc{
+ {
+ MethodName: "Stat",
+ Handler: _Ephs_Stat_Handler,
+ },
+ {
+ MethodName: "MkDir",
+ Handler: _Ephs_MkDir_Handler,
+ },
+ {
+ MethodName: "Delete",
+ Handler: _Ephs_Delete_Handler,
+ },
+ },
+ Streams: []grpc.StreamDesc{
+ {
+ StreamName: "Get",
+ Handler: _Ephs_Get_Handler,
+ ServerStreams: true,
+ },
+ {
+ StreamName: "Put",
+ Handler: _Ephs_Put_Handler,
+ ClientStreams: true,
+ },
+ {
+ StreamName: "Watch",
+ Handler: _Ephs_Watch_Handler,
+ ServerStreams: true,
+ },
+ },
+ Metadata: "service.proto",
+}
--- /dev/null
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.36.10
+// protoc v6.32.1
+// source: types.proto
+
+package ephs
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ timestamppb "google.golang.org/protobuf/types/known/timestamppb"
+ reflect "reflect"
+ sync "sync"
+ unsafe "unsafe"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type FsEntry_File_Compression int32
+
+const (
+ FsEntry_File_UNCOMPRESSED FsEntry_File_Compression = 0
+ FsEntry_File_GZIP FsEntry_File_Compression = 1
+)
+
+// Enum value maps for FsEntry_File_Compression.
+var (
+ FsEntry_File_Compression_name = map[int32]string{
+ 0: "UNCOMPRESSED",
+ 1: "GZIP",
+ }
+ FsEntry_File_Compression_value = map[string]int32{
+ "UNCOMPRESSED": 0,
+ "GZIP": 1,
+ }
+)
+
+func (x FsEntry_File_Compression) Enum() *FsEntry_File_Compression {
+ p := new(FsEntry_File_Compression)
+ *p = x
+ return p
+}
+
+func (x FsEntry_File_Compression) String() string {
+ return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (FsEntry_File_Compression) Descriptor() protoreflect.EnumDescriptor {
+ return file_types_proto_enumTypes[0].Descriptor()
+}
+
+func (FsEntry_File_Compression) Type() protoreflect.EnumType {
+ return &file_types_proto_enumTypes[0]
+}
+
+func (x FsEntry_File_Compression) Number() protoreflect.EnumNumber {
+ return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use FsEntry_File_Compression.Descriptor instead.
+func (FsEntry_File_Compression) EnumDescriptor() ([]byte, []int) {
+ return file_types_proto_rawDescGZIP(), []int{0, 1, 0}
+}
+
+type FsEntry struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Created *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=created,proto3" json:"created,omitempty"`
+ Modified *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=modified,proto3" json:"modified,omitempty"`
+ Version uint64 `protobuf:"varint,3,opt,name=version,proto3" json:"version,omitempty"`
+ Size uint64 `protobuf:"varint,4,opt,name=size,proto3" json:"size,omitempty"`
+ Owner string `protobuf:"bytes,5,opt,name=owner,proto3" json:"owner,omitempty"`
+ Content *FsEntry_Content `protobuf:"bytes,6,opt,name=content,proto3" json:"content,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *FsEntry) Reset() {
+ *x = FsEntry{}
+ mi := &file_types_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *FsEntry) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FsEntry) ProtoMessage() {}
+
+func (x *FsEntry) ProtoReflect() protoreflect.Message {
+ mi := &file_types_proto_msgTypes[0]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FsEntry.ProtoReflect.Descriptor instead.
+func (*FsEntry) Descriptor() ([]byte, []int) {
+ return file_types_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *FsEntry) GetCreated() *timestamppb.Timestamp {
+ if x != nil {
+ return x.Created
+ }
+ return nil
+}
+
+func (x *FsEntry) GetModified() *timestamppb.Timestamp {
+ if x != nil {
+ return x.Modified
+ }
+ return nil
+}
+
+func (x *FsEntry) GetVersion() uint64 {
+ if x != nil {
+ return x.Version
+ }
+ return 0
+}
+
+func (x *FsEntry) GetSize() uint64 {
+ if x != nil {
+ return x.Size
+ }
+ return 0
+}
+
+func (x *FsEntry) GetOwner() string {
+ if x != nil {
+ return x.Owner
+ }
+ return ""
+}
+
+func (x *FsEntry) GetContent() *FsEntry_Content {
+ if x != nil {
+ return x.Content
+ }
+ return nil
+}
+
+type FsEntry_Directory struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Entries []*FsEntry_Directory_DirectoryEntry `protobuf:"bytes,1,rep,name=entries,proto3" json:"entries,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *FsEntry_Directory) Reset() {
+ *x = FsEntry_Directory{}
+ mi := &file_types_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *FsEntry_Directory) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FsEntry_Directory) ProtoMessage() {}
+
+func (x *FsEntry_Directory) ProtoReflect() protoreflect.Message {
+ mi := &file_types_proto_msgTypes[1]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FsEntry_Directory.ProtoReflect.Descriptor instead.
+func (*FsEntry_Directory) Descriptor() ([]byte, []int) {
+ return file_types_proto_rawDescGZIP(), []int{0, 0}
+}
+
+func (x *FsEntry_Directory) GetEntries() []*FsEntry_Directory_DirectoryEntry {
+ if x != nil {
+ return x.Entries
+ }
+ return nil
+}
+
+type FsEntry_File struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Content []byte `protobuf:"bytes,1,opt,name=content,proto3" json:"content,omitempty"`
+ Compression FsEntry_File_Compression `protobuf:"varint,2,opt,name=compression,proto3,enum=fuhry.runtime.service.ephs.FsEntry_File_Compression" json:"compression,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *FsEntry_File) Reset() {
+ *x = FsEntry_File{}
+ mi := &file_types_proto_msgTypes[2]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *FsEntry_File) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FsEntry_File) ProtoMessage() {}
+
+func (x *FsEntry_File) ProtoReflect() protoreflect.Message {
+ mi := &file_types_proto_msgTypes[2]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FsEntry_File.ProtoReflect.Descriptor instead.
+func (*FsEntry_File) Descriptor() ([]byte, []int) {
+ return file_types_proto_rawDescGZIP(), []int{0, 1}
+}
+
+func (x *FsEntry_File) GetContent() []byte {
+ if x != nil {
+ return x.Content
+ }
+ return nil
+}
+
+func (x *FsEntry_File) GetCompression() FsEntry_File_Compression {
+ if x != nil {
+ return x.Compression
+ }
+ return FsEntry_File_UNCOMPRESSED
+}
+
+type FsEntry_LargeFile struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *FsEntry_LargeFile) Reset() {
+ *x = FsEntry_LargeFile{}
+ mi := &file_types_proto_msgTypes[3]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *FsEntry_LargeFile) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FsEntry_LargeFile) ProtoMessage() {}
+
+func (x *FsEntry_LargeFile) ProtoReflect() protoreflect.Message {
+ mi := &file_types_proto_msgTypes[3]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FsEntry_LargeFile.ProtoReflect.Descriptor instead.
+func (*FsEntry_LargeFile) Descriptor() ([]byte, []int) {
+ return file_types_proto_rawDescGZIP(), []int{0, 2}
+}
+
+func (x *FsEntry_LargeFile) GetKey() string {
+ if x != nil {
+ return x.Key
+ }
+ return ""
+}
+
+type FsEntry_Content struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ // Types that are valid to be assigned to Content:
+ //
+ // *FsEntry_Content_Directory
+ // *FsEntry_Content_File
+ // *FsEntry_Content_LargeFile
+ Content isFsEntry_Content_Content `protobuf_oneof:"content"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *FsEntry_Content) Reset() {
+ *x = FsEntry_Content{}
+ mi := &file_types_proto_msgTypes[4]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *FsEntry_Content) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FsEntry_Content) ProtoMessage() {}
+
+func (x *FsEntry_Content) ProtoReflect() protoreflect.Message {
+ mi := &file_types_proto_msgTypes[4]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FsEntry_Content.ProtoReflect.Descriptor instead.
+func (*FsEntry_Content) Descriptor() ([]byte, []int) {
+ return file_types_proto_rawDescGZIP(), []int{0, 3}
+}
+
+func (x *FsEntry_Content) GetContent() isFsEntry_Content_Content {
+ if x != nil {
+ return x.Content
+ }
+ return nil
+}
+
+func (x *FsEntry_Content) GetDirectory() *FsEntry_Directory {
+ if x != nil {
+ if x, ok := x.Content.(*FsEntry_Content_Directory); ok {
+ return x.Directory
+ }
+ }
+ return nil
+}
+
+func (x *FsEntry_Content) GetFile() *FsEntry_File {
+ if x != nil {
+ if x, ok := x.Content.(*FsEntry_Content_File); ok {
+ return x.File
+ }
+ }
+ return nil
+}
+
+func (x *FsEntry_Content) GetLargeFile() *FsEntry_LargeFile {
+ if x != nil {
+ if x, ok := x.Content.(*FsEntry_Content_LargeFile); ok {
+ return x.LargeFile
+ }
+ }
+ return nil
+}
+
+type isFsEntry_Content_Content interface {
+ isFsEntry_Content_Content()
+}
+
+type FsEntry_Content_Directory struct {
+ Directory *FsEntry_Directory `protobuf:"bytes,1,opt,name=directory,proto3,oneof"`
+}
+
+type FsEntry_Content_File struct {
+ File *FsEntry_File `protobuf:"bytes,2,opt,name=file,proto3,oneof"`
+}
+
+type FsEntry_Content_LargeFile struct {
+ LargeFile *FsEntry_LargeFile `protobuf:"bytes,3,opt,name=large_file,json=largeFile,proto3,oneof"`
+}
+
+func (*FsEntry_Content_Directory) isFsEntry_Content_Content() {}
+
+func (*FsEntry_Content_File) isFsEntry_Content_Content() {}
+
+func (*FsEntry_Content_LargeFile) isFsEntry_Content_Content() {}
+
+type FsEntry_Directory_DirectoryEntry struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+ Directory bool `protobuf:"varint,2,opt,name=directory,proto3" json:"directory,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *FsEntry_Directory_DirectoryEntry) Reset() {
+ *x = FsEntry_Directory_DirectoryEntry{}
+ mi := &file_types_proto_msgTypes[5]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *FsEntry_Directory_DirectoryEntry) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*FsEntry_Directory_DirectoryEntry) ProtoMessage() {}
+
+func (x *FsEntry_Directory_DirectoryEntry) ProtoReflect() protoreflect.Message {
+ mi := &file_types_proto_msgTypes[5]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use FsEntry_Directory_DirectoryEntry.ProtoReflect.Descriptor instead.
+func (*FsEntry_Directory_DirectoryEntry) Descriptor() ([]byte, []int) {
+ return file_types_proto_rawDescGZIP(), []int{0, 0, 0}
+}
+
+func (x *FsEntry_Directory_DirectoryEntry) GetName() string {
+ if x != nil {
+ return x.Name
+ }
+ return ""
+}
+
+func (x *FsEntry_Directory_DirectoryEntry) GetDirectory() bool {
+ if x != nil {
+ return x.Directory
+ }
+ return false
+}
+
+var File_types_proto protoreflect.FileDescriptor
+
+const file_types_proto_rawDesc = "" +
+ "\n" +
+ "\vtypes.proto\x12\x1afuhry.runtime.service.ephs\x1a\x1fgoogle/protobuf/timestamp.proto\"\xe7\x06\n" +
+ "\aFsEntry\x124\n" +
+ "\acreated\x18\x01 \x01(\v2\x1a.google.protobuf.TimestampR\acreated\x126\n" +
+ "\bmodified\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\bmodified\x12\x18\n" +
+ "\aversion\x18\x03 \x01(\x04R\aversion\x12\x12\n" +
+ "\x04size\x18\x04 \x01(\x04R\x04size\x12\x14\n" +
+ "\x05owner\x18\x05 \x01(\tR\x05owner\x12E\n" +
+ "\acontent\x18\x06 \x01(\v2+.fuhry.runtime.service.ephs.FsEntry.ContentR\acontent\x1a\xa7\x01\n" +
+ "\tDirectory\x12V\n" +
+ "\aentries\x18\x01 \x03(\v2<.fuhry.runtime.service.ephs.FsEntry.Directory.DirectoryEntryR\aentries\x1aB\n" +
+ "\x0eDirectoryEntry\x12\x12\n" +
+ "\x04name\x18\x01 \x01(\tR\x04name\x12\x1c\n" +
+ "\tdirectory\x18\x02 \x01(\bR\tdirectory\x1a\xa3\x01\n" +
+ "\x04File\x12\x18\n" +
+ "\acontent\x18\x01 \x01(\fR\acontent\x12V\n" +
+ "\vcompression\x18\x02 \x01(\x0e24.fuhry.runtime.service.ephs.FsEntry.File.CompressionR\vcompression\")\n" +
+ "\vCompression\x12\x10\n" +
+ "\fUNCOMPRESSED\x10\x00\x12\b\n" +
+ "\x04GZIP\x10\x01\x1a\x1d\n" +
+ "\tLargeFile\x12\x10\n" +
+ "\x03key\x18\x01 \x01(\tR\x03key\x1a\xf3\x01\n" +
+ "\aContent\x12M\n" +
+ "\tdirectory\x18\x01 \x01(\v2-.fuhry.runtime.service.ephs.FsEntry.DirectoryH\x00R\tdirectory\x12>\n" +
+ "\x04file\x18\x02 \x01(\v2(.fuhry.runtime.service.ephs.FsEntry.FileH\x00R\x04file\x12N\n" +
+ "\n" +
+ "large_file\x18\x03 \x01(\v2-.fuhry.runtime.service.ephs.FsEntry.LargeFileH\x00R\tlargeFileB\t\n" +
+ "\acontentB)Z'go.fuhry.dev/runtime/proto/service/ephsb\x06proto3"
+
+var (
+ file_types_proto_rawDescOnce sync.Once
+ file_types_proto_rawDescData []byte
+)
+
+func file_types_proto_rawDescGZIP() []byte {
+ file_types_proto_rawDescOnce.Do(func() {
+ file_types_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_types_proto_rawDesc), len(file_types_proto_rawDesc)))
+ })
+ return file_types_proto_rawDescData
+}
+
+var file_types_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
+var file_types_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
+var file_types_proto_goTypes = []any{
+ (FsEntry_File_Compression)(0), // 0: fuhry.runtime.service.ephs.FsEntry.File.Compression
+ (*FsEntry)(nil), // 1: fuhry.runtime.service.ephs.FsEntry
+ (*FsEntry_Directory)(nil), // 2: fuhry.runtime.service.ephs.FsEntry.Directory
+ (*FsEntry_File)(nil), // 3: fuhry.runtime.service.ephs.FsEntry.File
+ (*FsEntry_LargeFile)(nil), // 4: fuhry.runtime.service.ephs.FsEntry.LargeFile
+ (*FsEntry_Content)(nil), // 5: fuhry.runtime.service.ephs.FsEntry.Content
+ (*FsEntry_Directory_DirectoryEntry)(nil), // 6: fuhry.runtime.service.ephs.FsEntry.Directory.DirectoryEntry
+ (*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp
+}
+var file_types_proto_depIdxs = []int32{
+ 7, // 0: fuhry.runtime.service.ephs.FsEntry.created:type_name -> google.protobuf.Timestamp
+ 7, // 1: fuhry.runtime.service.ephs.FsEntry.modified:type_name -> google.protobuf.Timestamp
+ 5, // 2: fuhry.runtime.service.ephs.FsEntry.content:type_name -> fuhry.runtime.service.ephs.FsEntry.Content
+ 6, // 3: fuhry.runtime.service.ephs.FsEntry.Directory.entries:type_name -> fuhry.runtime.service.ephs.FsEntry.Directory.DirectoryEntry
+ 0, // 4: fuhry.runtime.service.ephs.FsEntry.File.compression:type_name -> fuhry.runtime.service.ephs.FsEntry.File.Compression
+ 2, // 5: fuhry.runtime.service.ephs.FsEntry.Content.directory:type_name -> fuhry.runtime.service.ephs.FsEntry.Directory
+ 3, // 6: fuhry.runtime.service.ephs.FsEntry.Content.file:type_name -> fuhry.runtime.service.ephs.FsEntry.File
+ 4, // 7: fuhry.runtime.service.ephs.FsEntry.Content.large_file:type_name -> fuhry.runtime.service.ephs.FsEntry.LargeFile
+ 8, // [8:8] is the sub-list for method output_type
+ 8, // [8:8] is the sub-list for method input_type
+ 8, // [8:8] is the sub-list for extension type_name
+ 8, // [8:8] is the sub-list for extension extendee
+ 0, // [0:8] is the sub-list for field type_name
+}
+
+func init() { file_types_proto_init() }
+func file_types_proto_init() {
+ if File_types_proto != nil {
+ return
+ }
+ file_types_proto_msgTypes[4].OneofWrappers = []any{
+ (*FsEntry_Content_Directory)(nil),
+ (*FsEntry_Content_File)(nil),
+ (*FsEntry_Content_LargeFile)(nil),
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: unsafe.Slice(unsafe.StringData(file_types_proto_rawDesc), len(file_types_proto_rawDesc)),
+ NumEnums: 1,
+ NumMessages: 6,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_types_proto_goTypes,
+ DependencyIndexes: file_types_proto_depIdxs,
+ EnumInfos: file_types_proto_enumTypes,
+ MessageInfos: file_types_proto_msgTypes,
+ }.Build()
+ File_types_proto = out.File
+ file_types_proto_goTypes = nil
+ file_types_proto_depIdxs = nil
+}
--- /dev/null
+syntax = "proto3";
+
+import "google/protobuf/timestamp.proto";
+
+option go_package = "go.fuhry.dev/runtime/proto/service/ephs";
+
+package fuhry.runtime.service.ephs;
+
+message FsEntry {
+ message Directory {
+ message DirectoryEntry {
+ string name = 1;
+ bool directory = 2;
+ }
+ repeated DirectoryEntry entries = 1;
+ }
+
+ message File {
+ enum Compression {
+ UNCOMPRESSED = 0;
+ GZIP = 1;
+ }
+ bytes content = 1;
+ Compression compression = 2;
+ }
+
+ message LargeFile {
+ string key = 1;
+ }
+
+ message Content {
+ oneof content {
+ Directory directory = 1;
+ File file = 2;
+ LargeFile large_file = 3;
+ }
+ }
+
+ google.protobuf.Timestamp created = 1;
+ google.protobuf.Timestamp modified = 2;
+ uint64 version = 3;
+ uint64 size = 4;
+ string owner = 5;
+ Content content = 6;
+}
--- /dev/null
+package stringmatch
+
+import (
+ "errors"
+ "fmt"
+)
+
+type NewSyntaxMatchRule struct {
+ Prefix *struct {
+ V string `yaml:"prefix"`
+ } `yaml:",inline"`
+ Suffix *struct {
+ V string `yaml:"suffix"`
+ } `yaml:",inline"`
+ Exact *struct {
+ V string `yaml:"exact"`
+ } `yaml:",inline"`
+ Contains *struct {
+ V string `yaml:"contains"`
+ } `yaml:",inline"`
+ Regexp *struct {
+ V string `yaml:"regexp"`
+ } `yaml:",inline"`
+ Any *struct {
+ V bool `yaml:"any"`
+ } `yaml:",inline"`
+ Never *struct {
+ V bool `yaml:"never"`
+ } `yaml:",inline"`
+ And *struct {
+ V []*NewSyntaxMatchRule `yaml:"and"`
+ } `yaml:",inline"`
+ Or *struct {
+ V []*NewSyntaxMatchRule `yaml:"or"`
+ } `yaml:",inline"`
+}
+
+func (m *NewSyntaxMatchRule) Matcher() (StringMatcher, error) {
+ var out StringMatcher
+ count := 0
+
+ if m.Prefix != nil {
+ out = Prefix(m.Prefix.V)
+ count++
+ }
+ if m.Suffix != nil {
+ out = Suffix(m.Suffix.V)
+ count++
+ }
+ if m.Exact != nil {
+ out = Exact(m.Exact.V)
+ count++
+ }
+ if m.Contains != nil {
+ out = Contains(m.Contains.V)
+ count++
+ }
+ if m.Regexp != nil {
+ out = Regexp(m.Regexp.V)
+ count++
+ }
+ if m.Any != nil {
+ if m.Any.V != true {
+ return nil, errors.New("any may only be true")
+ }
+ out = Any{}
+ count++
+ }
+ if m.Never != nil && m.Never.V {
+ if m.Never.V != true {
+ return nil, errors.New("never may only be true")
+ }
+ out = Never{}
+ count++
+ }
+ if m.And != nil && len(m.And.V) > 0 {
+ count++
+ out = &andMatcher{}
+ for i, m := range m.And.V {
+ mat, err := m.Matcher()
+ if err != nil {
+ return nil, fmt.Errorf("and-rule at index %d: %v", i, err)
+ }
+ out.(*andMatcher).matchers = append(out.(*andMatcher).matchers, mat)
+ }
+ }
+ if m.Or != nil && len(m.Or.V) > 0 {
+ count++
+ out = &orMatcher{}
+ for i, m := range m.Or.V {
+ mat, err := m.Matcher()
+ if err != nil {
+ return nil, fmt.Errorf("or-rule at index %d: %v", i, err)
+ }
+ out.(*orMatcher).matchers = append(out.(*orMatcher).matchers, mat)
+ }
+ }
+
+ if count != 1 {
+ return nil, fmt.Errorf(
+ "match rule must contain exactly one of: prefix, suffix, exact, " +
+ "contains, regexp, any, never, and, or")
+ }
+
+ return out, nil
+}
+
+func (m *NewSyntaxMatchRule) Match(v string) bool {
+ mat, err := m.Matcher()
+ if err != nil {
+ panic(err)
+ }
+ return mat.Match(v)
+}
--- /dev/null
+package stringmatch
+
+import (
+ "bytes"
+ "fmt"
+ "os"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+ "gopkg.in/yaml.v3"
+)
+
+type ptr[T any] interface {
+ *T
+}
+
+func mustDecode[T any, PT ptr[T]](t *testing.T, str string) PT {
+ out := new(T)
+ buf := bytes.NewBufferString(str)
+ decoder := yaml.NewDecoder(buf)
+ if err := decoder.Decode(out); err != nil {
+ fmt.Fprintf(os.Stderr, "error decoding yaml %q: %v", str, err)
+ t.FailNow()
+ }
+
+ return out
+}
+
+func TestNSMExact(t *testing.T) {
+ m := mustDecode[NewSyntaxMatchRule](t, "exact: meh")
+ assert.True(t, m.Match("meh"))
+}
+
+func TestNSMPrefix(t *testing.T) {
+ m := mustDecode[NewSyntaxMatchRule](t, "prefix: meh")
+ assert.True(t, m.Match("mehh"))
+ assert.False(t, m.Match("ehh"))
+}
+
+func TestNSMSuffix(t *testing.T) {
+ m := mustDecode[NewSyntaxMatchRule](t, "suffix: eh")
+ assert.True(t, m.Match("meh"))
+ assert.False(t, m.Match("ehh"))
+}
+
+func TestNSMContains(t *testing.T) {
+ m := mustDecode[NewSyntaxMatchRule](t, "contains: eh")
+ assert.True(t, m.Match("meh"))
+ assert.False(t, m.Match("uh"))
+}
+
+func TestNSMRegexp(t *testing.T) {
+ m := mustDecode[NewSyntaxMatchRule](t, "regexp: ^meh$")
+ assert.True(t, m.Match("meh"))
+ assert.False(t, m.Match("ehh"))
+}
+
+func TestNSMAnd(t *testing.T) {
+ const rules = `and:
+ - prefix: f
+ - suffix: oo
+`
+ m := mustDecode[NewSyntaxMatchRule](t, rules)
+ assert.True(t, m.Match("foo"))
+ assert.False(t, m.Match("boo"))
+}
+
+func TestNSMOr(t *testing.T) {
+ const rules = `or:
+ - prefix: f
+ - suffix: oo
+`
+ m := mustDecode[NewSyntaxMatchRule](t, rules)
+ assert.True(t, m.Match("foo"))
+ assert.True(t, m.Match("boo"))
+ assert.False(t, m.Match("bar"))
+}
+
+func TestNSMErrors(t *testing.T) {
+ _, err := mustDecode[NewSyntaxMatchRule](t, "startswith: invalid").Matcher()
+ assert.Error(t, err)
+
+ _, err = mustDecode[NewSyntaxMatchRule](t, "contains: a\nprefix: b").Matcher()
+ assert.Error(t, err)
+
+ _, err = mustDecode[NewSyntaxMatchRule](t, "any: false\nprefix: b").Matcher()
+ assert.Error(t, err)
+}