]> go.fuhry.dev Git - runtime.git/commitdiff
Genericize "option" interface
authorDan Fuhry <dan@fuhry.com>
Wed, 19 Nov 2025 21:36:13 +0000 (16:36 -0500)
committerDan Fuhry <dan@fuhry.com>
Wed, 19 Nov 2025 21:45:46 +0000 (16:45 -0500)
"Option" variadic pattern has been used in lots of things lately - grpc client and server, ephs, etc. Although the implementation is trivial, the repetition was becoming obnoxious.

Other trivial fixes:
- Fixed sd watcher wait group never calling Done if etcd client context is canceled
- Fixed etcd_factory type declaration

12 files changed:
ephs/BUILD.bazel
ephs/client.go
ephs/servicer/BUILD.bazel
ephs/servicer/servicer.go
grpc/internal/client/BUILD.bazel
grpc/internal/client/client.go
grpc/internal/server/BUILD.bazel
grpc/internal/server/server.go
sd/etcd_factory.go
sd/watcher.go
utils/option/BUILD.bazel [new file with mode: 0644]
utils/option/option.go [new file with mode: 0644]

index 242cfba06030dfbd6ecf5a326ab3636d990b01c6..8c1413f4f9343d3b5806ad7cdbf5061cc73ad7a7 100644 (file)
@@ -10,7 +10,9 @@ go_library(
         "//grpc/internal/common",
         "//mtls",
         "//proto/service/ephs",
+        "//utils/context",
         "//utils/log",
+        "//utils/option",
         "@com_github_quic_go_quic_go//:quic-go",
         "@org_golang_google_grpc//codes",
         "@org_golang_google_grpc//status",
index 0ca364b23704188a0b9a10d2bce4edaddca487d1..4f3c55bb981f157f5cfcdffd7f98650f26c80a29 100644 (file)
@@ -2,7 +2,6 @@ package ephs
 
 import (
        "bytes"
-       "context"
        "errors"
        "fmt"
        "io"
@@ -16,7 +15,9 @@ import (
        grpc_common "go.fuhry.dev/runtime/grpc/internal/common"
        "go.fuhry.dev/runtime/mtls"
        ephs_pb "go.fuhry.dev/runtime/proto/service/ephs"
+       "go.fuhry.dev/runtime/utils/context"
        "go.fuhry.dev/runtime/utils/log"
+       "go.fuhry.dev/runtime/utils/option"
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/status"
 )
@@ -47,13 +48,7 @@ type Client interface {
        Watch(ctx context.Context, path string) (<-chan *ephs_pb.WatchResponse, error)
 }
 
-type ClientOption interface {
-       Apply(*clientImpl) error
-}
-
-type genericClientOption struct {
-       apply func(*clientImpl) error
-}
+type ClientOption = option.Option[*clientImpl]
 
 type clientImpl struct {
        defaultCtx     context.Context
@@ -131,17 +126,11 @@ func FormatFsEntry(e *ephs_pb.FsEntry) string {
        return out
 }
 
-func (o *genericClientOption) Apply(c *clientImpl) error {
-       return o.apply(c)
-}
-
 func WithDefaultTimeout(d time.Duration) ClientOption {
-       return &genericClientOption{
-               apply: func(c *clientImpl) error {
-                       c.defaultTimeout = d
-                       return nil
-               },
-       }
+       return option.NewOption(func(c *clientImpl) error {
+               c.defaultTimeout = d
+               return nil
+       })
 }
 
 func DefaultClient() (Client, error) {
@@ -153,7 +142,7 @@ func DefaultClient() (Client, error) {
        }
 
        if DefaultClientContext == nil {
-               DefaultClientContext = context.Background()
+               DefaultClientContext, _ = context.Interruptible()
        }
 
        client, err := NewClient(DefaultClientContext, mtls.DefaultIdentity())
@@ -412,7 +401,7 @@ func (c *clientImpl) watch(origCtx context.Context, origPath string, stream ephs
 
 func (c *clientImpl) grpcClient() (ephs_pb.EphsClient, error) {
        var err error
-       serverId := mtls.NewServiceIdentity("ephs")
+       serverId := mtls.NewRemoteServiceIdentity("ephs")
 
        if c.client == nil {
                c.client, err = grpc.NewGrpcClient(c.defaultCtx, serverId, c.id,
index 0a8aae79fd70bf8c80535db874fc682d29e2b17f..cff5bc5228a9ac4ce3cb1d9c9b9120b58eba4b33 100644 (file)
@@ -21,6 +21,7 @@ go_library(
         "//sd",
         "//utils/hostname",
         "//utils/log",
+        "//utils/option",
         "//utils/stringmatch",
         "@com_github_minio_minio_go_v7//:minio-go",
         "@com_github_minio_minio_go_v7//pkg/credentials",
index eccb51bf1ccdd0cebb85f9a31897d583368e509c..541f6f27b305064025dde639c42ba0679e6eb0f2 100644 (file)
@@ -31,15 +31,10 @@ import (
        "go.fuhry.dev/runtime/sd"
        "go.fuhry.dev/runtime/utils/hostname"
        "go.fuhry.dev/runtime/utils/log"
+       "go.fuhry.dev/runtime/utils/option"
 )
 
-type Option interface {
-       Apply(*ephsServicer) error
-}
-
-type genericOption struct {
-       apply func(*ephsServicer) error
-}
+type Option = option.Option[*ephsServicer]
 
 type msgWithPath interface {
        GetPath() string
@@ -56,19 +51,15 @@ type ephsServicer struct {
        s3Client *minio.Client
 }
 
-var cell string
+var _ ephs_proto.EphsServer = &ephsServicer{}
 
-func (o *genericOption) Apply(servicer *ephsServicer) error {
-       return o.apply(servicer)
-}
+var cell string
 
 func WithAcl(acl *Acl) Option {
-       return &genericOption{
-               apply: func(s *ephsServicer) error {
-                       s.acl = acl
-                       return nil
-               },
-       }
+       return option.NewOption(func(s *ephsServicer) error {
+               s.acl = acl
+               return nil
+       })
 }
 
 func (s *ephsServicer) watchAcl(aclFile string) {
@@ -87,20 +78,18 @@ func (s *ephsServicer) watchAcl(aclFile string) {
 }
 
 func WithAclFile(aclFile string) Option {
-       return &genericOption{
-               apply: func(s *ephsServicer) error {
-                       acl, err := LoadAcl(aclFile)
-                       if err != nil {
-                               return err
-                       }
-                       s.logger.Noticef("loaded %d ACL rules from %s",
-                               len(acl.Rules),
-                               aclFile)
-                       s.acl = acl
-                       s.watchAcl(aclFile)
-                       return nil
-               },
-       }
+       return option.NewOption(func(s *ephsServicer) error {
+               acl, err := LoadAcl(aclFile)
+               if err != nil {
+                       return err
+               }
+               s.logger.Noticef("loaded %d ACL rules from %s",
+                       len(acl.Rules),
+                       aclFile)
+               s.acl = acl
+               s.watchAcl(aclFile)
+               return nil
+       })
 }
 
 func NewEphsServicer(opts ...Option) (ephs_proto.EphsServer, error) {
index 7e89d3a1a4d8922e86a96af7c3e4e11d0b7a23de..767ab9242757166d4d4c15db7c87c3c7bc3fdf9c 100644 (file)
@@ -21,6 +21,7 @@ go_library(
         "//net/dns",
         "//sd",
         "//utils/context",
+        "//utils/option",
         "@org_golang_google_grpc//:grpc",
     ],
 )
index 60d0edf9f12d4bdb071842636e5b026188d4890f..b981af5a50fc103de468867998eba7c321035f99 100644 (file)
@@ -8,6 +8,7 @@ import (
        "go.fuhry.dev/runtime/grpc/internal/common"
        "go.fuhry.dev/runtime/mtls"
        "go.fuhry.dev/runtime/sd"
+       "go.fuhry.dev/runtime/utils/option"
        "google.golang.org/grpc"
 )
 
@@ -18,22 +19,12 @@ type Client interface {
        Conn() (*grpc.ClientConn, error)
 }
 
-type ClientOption interface {
-       apply(*client) error
-}
-
-type clientOption struct {
-       f func(*client) error
-}
+type ClientOption = option.Option[*client]
 
 type AddressProvider interface {
        GetAddrs(context.Context) ([]sd.ServiceAddress, error)
 }
 
-func (o *clientOption) apply(c *client) error {
-       return o.f(c)
-}
-
 type client struct {
        ctx      context.Context
        serverId mtls.Identity
@@ -51,21 +42,17 @@ func (s *staticAddressProvider) GetAddrs(_ context.Context) ([]sd.ServiceAddress
 }
 
 func WithConnectionFactory(fac common.ConnectionFactory) ClientOption {
-       return &clientOption{
-               f: func(c *client) error {
-                       c.connFac = fac
-                       return nil
-               },
-       }
+       return option.NewOption(func(c *client) error {
+               c.connFac = fac
+               return nil
+       })
 }
 
 func WithAddressProvider(ap AddressProvider) ClientOption {
-       return &clientOption{
-               f: func(c *client) error {
-                       c.watcher = ap
-                       return nil
-               },
-       }
+       return option.NewOption(func(c *client) error {
+               c.watcher = ap
+               return nil
+       })
 }
 
 func WithStaticAddress(addresses ...*net.TCPAddr) ClientOption {
@@ -85,27 +72,23 @@ func WithStaticAddress(addresses ...*net.TCPAddr) ClientOption {
                })
        }
 
-       return &clientOption{
-               f: func(c *client) error {
-                       c.watcher = &staticAddressProvider{
-                               addresses: addrs,
-                       }
-                       return nil
-               },
-       }
+       return option.NewOption(func(c *client) error {
+               c.watcher = &staticAddressProvider{
+                       addresses: addrs,
+               }
+               return nil
+       })
 }
 
 func WithDNSSRV() ClientOption {
-       return &clientOption{
-               f: func(c *client) error {
-                       ap := &dnsSrvAddressProvider{
-                               serverId: c.serverId,
-                       }
-
-                       c.watcher = ap
-                       return nil
-               },
-       }
+       return option.NewOption(func(c *client) error {
+               ap := &dnsSrvAddressProvider{
+                       serverId: c.serverId,
+               }
+
+               c.watcher = ap
+               return nil
+       })
 }
 
 func NewGrpcClient(ctx context.Context, serverId, clientId mtls.Identity, opts ...ClientOption) (Client, error) {
@@ -117,7 +100,7 @@ func NewGrpcClient(ctx context.Context, serverId, clientId mtls.Identity, opts .
        }
 
        for _, opt := range opts {
-               if err := opt.apply(cl); err != nil {
+               if err := opt.Apply(cl); err != nil {
                        return nil, err
                }
        }
index b7a1bb2a3a48e891bc5fb03762702483cf3d390e..c2b13a3d0bb94e083f20973dd8e51aa54c947c1a 100644 (file)
@@ -17,6 +17,7 @@ go_library(
         "//sd",
         "//utils/hostname",
         "//utils/log",
+        "//utils/option",
         "@com_github_hashicorp_golang_lru_v2//:golang-lru",
         "@dev_fuhry_go_grpc_quic//:grpc-quic",
         "@org_golang_google_grpc//:grpc",
index 93881d34a5670794d73f7deece3159f13cf98ec3..49b273a1affd4806687c485a4221ed0caf0d84d0 100644 (file)
@@ -19,6 +19,7 @@ import (
        "go.fuhry.dev/runtime/sd"
        "go.fuhry.dev/runtime/utils/hostname"
        "go.fuhry.dev/runtime/utils/log"
+       "go.fuhry.dev/runtime/utils/option"
        "google.golang.org/grpc"
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/credentials"
@@ -40,25 +41,13 @@ type Server struct {
        hc         HealthCheckServicer
 }
 
-type ServerOption interface {
-       apply(*Server) error
-}
-
-type serverOption struct {
-       callback func(*Server) error
-}
-
-func (o *serverOption) apply(s *Server) error {
-       return o.callback(s)
-}
+type ServerOption = option.Option[*Server]
 
 func WithTransport(cf common.ConnectionFactory) ServerOption {
-       return &serverOption{
-               callback: func(s *Server) error {
-                       s.connFac = cf
-                       return nil
-               },
-       }
+       return option.NewOption(func(s *Server) error {
+               s.connFac = cf
+               return nil
+       })
 }
 
 var defaultPort *uint
@@ -114,7 +103,7 @@ func NewGrpcServerWithPort(id mtls.Identity, port uint16, opts ...ServerOption)
        }
 
        for _, opt := range opts {
-               if err := opt.apply(server); err != nil {
+               if err := opt.Apply(server); err != nil {
                        return nil, err
                }
        }
index 487417a941cdefc1b266e81ba13ee50232bd0822..b962c382db860efe947c05c2510ec42beb49c6d5 100644 (file)
@@ -30,7 +30,7 @@ func NewDefaultEtcdClient() (*etcd_client.Client, error) {
 
        logger := log.WithPrefix("etcd-client")
 
-       id := mtls.ParseIdentity(etcdMtlsId)
+       var id mtls.Identity = mtls.ParseIdentity(etcdMtlsId)
        if !id.IsValid() {
                id = mtls.DefaultIdentity()
 
index cdda3b3fb83f12fb4416edc9424831e3f5b84266..9b57c0735de465a0bd8038cafd4b7f3a9101d475 100644 (file)
@@ -85,9 +85,8 @@ func (w *SDWatcher) GetAddrs(ctx context.Context) ([]ServiceAddress, error) {
        }
 
        w.starter.Do(func() {
-               go w.watch(ctx)
-
                w.wg.Add(1)
+               go w.watch(ctx)
                w.wg.Wait()
        })
 
@@ -99,6 +98,10 @@ func (w *SDWatcher) GetAddrs(ctx context.Context) ([]ServiceAddress, error) {
 }
 
 func (w *SDWatcher) watch(ctx context.Context) {
+       // doneOnce is used to call wg.Done() either as soon as the first result is published,
+       // or when we have to return due to a fatal error
+       var doneOnce sync.Once
+       defer doneOnce.Do(w.wg.Done)
        kvs := make(map[string][]byte, 0)
 
        w.logger.V(1).Infof("Watching for service publications under path %s", w.prefix())
@@ -110,8 +113,7 @@ func (w *SDWatcher) watch(ctx context.Context) {
                }
 
                w.publishResults(kvs)
-               w.wg.Done()
-
+               doneOnce.Do(w.wg.Done)
        }
 
        watcher := etcd_client.NewWatcher(w.EtcdClient)
@@ -131,6 +133,7 @@ func (w *SDWatcher) watch(ctx context.Context) {
                        }
 
                        w.publishResults(kvs)
+                       doneOnce.Do(w.wg.Done)
                case <-ctx.Done():
                        return
                }
diff --git a/utils/option/BUILD.bazel b/utils/option/BUILD.bazel
new file mode 100644 (file)
index 0000000..8f8fe87
--- /dev/null
@@ -0,0 +1,8 @@
+load("@rules_go//go:def.bzl", "go_library")
+
+go_library(
+    name = "option",
+    srcs = ["option.go"],
+    importpath = "go.fuhry.dev/runtime/utils/option",
+    visibility = ["//visibility:public"],
+)
diff --git a/utils/option/option.go b/utils/option/option.go
new file mode 100644 (file)
index 0000000..4d20871
--- /dev/null
@@ -0,0 +1,19 @@
+package option
+
+type Option[T any] interface {
+       Apply(T) error
+}
+
+type optionImpl[T any] struct {
+       callback func(T) error
+}
+
+func NewOption[T any](callback func(T) error) Option[T] {
+       return &optionImpl[T]{
+               callback: callback,
+       }
+}
+
+func (o optionImpl[T]) Apply(t T) error {
+       return o.callback(t)
+}