From: Dan Fuhry Date: Wed, 19 Nov 2025 21:36:13 +0000 (-0500) Subject: Genericize "option" interface X-Git-Url: https://go.fuhry.dev/?a=commitdiff_plain;h=020d9e6d71477f864f3c3456eb39288fca98216c;p=runtime.git Genericize "option" interface "Option" variadic pattern has been used in lots of things lately - grpc client and server, ephs, etc. Although the implementation is trivial, the repetition was becoming obnoxious. Other trivial fixes: - Fixed sd watcher wait group never calling Done if etcd client context is canceled - Fixed etcd_factory type declaration --- diff --git a/ephs/BUILD.bazel b/ephs/BUILD.bazel index 242cfba..8c1413f 100644 --- a/ephs/BUILD.bazel +++ b/ephs/BUILD.bazel @@ -10,7 +10,9 @@ go_library( "//grpc/internal/common", "//mtls", "//proto/service/ephs", + "//utils/context", "//utils/log", + "//utils/option", "@com_github_quic_go_quic_go//:quic-go", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", diff --git a/ephs/client.go b/ephs/client.go index 0ca364b..4f3c55b 100644 --- a/ephs/client.go +++ b/ephs/client.go @@ -2,7 +2,6 @@ package ephs import ( "bytes" - "context" "errors" "fmt" "io" @@ -16,7 +15,9 @@ import ( grpc_common "go.fuhry.dev/runtime/grpc/internal/common" "go.fuhry.dev/runtime/mtls" ephs_pb "go.fuhry.dev/runtime/proto/service/ephs" + "go.fuhry.dev/runtime/utils/context" "go.fuhry.dev/runtime/utils/log" + "go.fuhry.dev/runtime/utils/option" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -47,13 +48,7 @@ type Client interface { Watch(ctx context.Context, path string) (<-chan *ephs_pb.WatchResponse, error) } -type ClientOption interface { - Apply(*clientImpl) error -} - -type genericClientOption struct { - apply func(*clientImpl) error -} +type ClientOption = option.Option[*clientImpl] type clientImpl struct { defaultCtx context.Context @@ -131,17 +126,11 @@ func FormatFsEntry(e *ephs_pb.FsEntry) string { return out } -func (o *genericClientOption) Apply(c *clientImpl) error { - return o.apply(c) -} - func WithDefaultTimeout(d time.Duration) ClientOption { - return &genericClientOption{ - apply: func(c *clientImpl) error { - c.defaultTimeout = d - return nil - }, - } + return option.NewOption(func(c *clientImpl) error { + c.defaultTimeout = d + return nil + }) } func DefaultClient() (Client, error) { @@ -153,7 +142,7 @@ func DefaultClient() (Client, error) { } if DefaultClientContext == nil { - DefaultClientContext = context.Background() + DefaultClientContext, _ = context.Interruptible() } client, err := NewClient(DefaultClientContext, mtls.DefaultIdentity()) @@ -412,7 +401,7 @@ func (c *clientImpl) watch(origCtx context.Context, origPath string, stream ephs func (c *clientImpl) grpcClient() (ephs_pb.EphsClient, error) { var err error - serverId := mtls.NewServiceIdentity("ephs") + serverId := mtls.NewRemoteServiceIdentity("ephs") if c.client == nil { c.client, err = grpc.NewGrpcClient(c.defaultCtx, serverId, c.id, diff --git a/ephs/servicer/BUILD.bazel b/ephs/servicer/BUILD.bazel index 0a8aae7..cff5bc5 100644 --- a/ephs/servicer/BUILD.bazel +++ b/ephs/servicer/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//sd", "//utils/hostname", "//utils/log", + "//utils/option", "//utils/stringmatch", "@com_github_minio_minio_go_v7//:minio-go", "@com_github_minio_minio_go_v7//pkg/credentials", diff --git a/ephs/servicer/servicer.go b/ephs/servicer/servicer.go index eccb51b..541f6f2 100644 --- a/ephs/servicer/servicer.go +++ b/ephs/servicer/servicer.go @@ -31,15 +31,10 @@ import ( "go.fuhry.dev/runtime/sd" "go.fuhry.dev/runtime/utils/hostname" "go.fuhry.dev/runtime/utils/log" + "go.fuhry.dev/runtime/utils/option" ) -type Option interface { - Apply(*ephsServicer) error -} - -type genericOption struct { - apply func(*ephsServicer) error -} +type Option = option.Option[*ephsServicer] type msgWithPath interface { GetPath() string @@ -56,19 +51,15 @@ type ephsServicer struct { s3Client *minio.Client } -var cell string +var _ ephs_proto.EphsServer = &ephsServicer{} -func (o *genericOption) Apply(servicer *ephsServicer) error { - return o.apply(servicer) -} +var cell string func WithAcl(acl *Acl) Option { - return &genericOption{ - apply: func(s *ephsServicer) error { - s.acl = acl - return nil - }, - } + return option.NewOption(func(s *ephsServicer) error { + s.acl = acl + return nil + }) } func (s *ephsServicer) watchAcl(aclFile string) { @@ -87,20 +78,18 @@ func (s *ephsServicer) watchAcl(aclFile string) { } func WithAclFile(aclFile string) Option { - return &genericOption{ - apply: func(s *ephsServicer) error { - acl, err := LoadAcl(aclFile) - if err != nil { - return err - } - s.logger.Noticef("loaded %d ACL rules from %s", - len(acl.Rules), - aclFile) - s.acl = acl - s.watchAcl(aclFile) - return nil - }, - } + return option.NewOption(func(s *ephsServicer) error { + acl, err := LoadAcl(aclFile) + if err != nil { + return err + } + s.logger.Noticef("loaded %d ACL rules from %s", + len(acl.Rules), + aclFile) + s.acl = acl + s.watchAcl(aclFile) + return nil + }) } func NewEphsServicer(opts ...Option) (ephs_proto.EphsServer, error) { diff --git a/grpc/internal/client/BUILD.bazel b/grpc/internal/client/BUILD.bazel index 7e89d3a..767ab92 100644 --- a/grpc/internal/client/BUILD.bazel +++ b/grpc/internal/client/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//net/dns", "//sd", "//utils/context", + "//utils/option", "@org_golang_google_grpc//:grpc", ], ) diff --git a/grpc/internal/client/client.go b/grpc/internal/client/client.go index 60d0edf..b981af5 100644 --- a/grpc/internal/client/client.go +++ b/grpc/internal/client/client.go @@ -8,6 +8,7 @@ import ( "go.fuhry.dev/runtime/grpc/internal/common" "go.fuhry.dev/runtime/mtls" "go.fuhry.dev/runtime/sd" + "go.fuhry.dev/runtime/utils/option" "google.golang.org/grpc" ) @@ -18,22 +19,12 @@ type Client interface { Conn() (*grpc.ClientConn, error) } -type ClientOption interface { - apply(*client) error -} - -type clientOption struct { - f func(*client) error -} +type ClientOption = option.Option[*client] type AddressProvider interface { GetAddrs(context.Context) ([]sd.ServiceAddress, error) } -func (o *clientOption) apply(c *client) error { - return o.f(c) -} - type client struct { ctx context.Context serverId mtls.Identity @@ -51,21 +42,17 @@ func (s *staticAddressProvider) GetAddrs(_ context.Context) ([]sd.ServiceAddress } func WithConnectionFactory(fac common.ConnectionFactory) ClientOption { - return &clientOption{ - f: func(c *client) error { - c.connFac = fac - return nil - }, - } + return option.NewOption(func(c *client) error { + c.connFac = fac + return nil + }) } func WithAddressProvider(ap AddressProvider) ClientOption { - return &clientOption{ - f: func(c *client) error { - c.watcher = ap - return nil - }, - } + return option.NewOption(func(c *client) error { + c.watcher = ap + return nil + }) } func WithStaticAddress(addresses ...*net.TCPAddr) ClientOption { @@ -85,27 +72,23 @@ func WithStaticAddress(addresses ...*net.TCPAddr) ClientOption { }) } - return &clientOption{ - f: func(c *client) error { - c.watcher = &staticAddressProvider{ - addresses: addrs, - } - return nil - }, - } + return option.NewOption(func(c *client) error { + c.watcher = &staticAddressProvider{ + addresses: addrs, + } + return nil + }) } func WithDNSSRV() ClientOption { - return &clientOption{ - f: func(c *client) error { - ap := &dnsSrvAddressProvider{ - serverId: c.serverId, - } - - c.watcher = ap - return nil - }, - } + return option.NewOption(func(c *client) error { + ap := &dnsSrvAddressProvider{ + serverId: c.serverId, + } + + c.watcher = ap + return nil + }) } func NewGrpcClient(ctx context.Context, serverId, clientId mtls.Identity, opts ...ClientOption) (Client, error) { @@ -117,7 +100,7 @@ func NewGrpcClient(ctx context.Context, serverId, clientId mtls.Identity, opts . } for _, opt := range opts { - if err := opt.apply(cl); err != nil { + if err := opt.Apply(cl); err != nil { return nil, err } } diff --git a/grpc/internal/server/BUILD.bazel b/grpc/internal/server/BUILD.bazel index b7a1bb2..c2b13a3 100644 --- a/grpc/internal/server/BUILD.bazel +++ b/grpc/internal/server/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//sd", "//utils/hostname", "//utils/log", + "//utils/option", "@com_github_hashicorp_golang_lru_v2//:golang-lru", "@dev_fuhry_go_grpc_quic//:grpc-quic", "@org_golang_google_grpc//:grpc", diff --git a/grpc/internal/server/server.go b/grpc/internal/server/server.go index 93881d3..49b273a 100644 --- a/grpc/internal/server/server.go +++ b/grpc/internal/server/server.go @@ -19,6 +19,7 @@ import ( "go.fuhry.dev/runtime/sd" "go.fuhry.dev/runtime/utils/hostname" "go.fuhry.dev/runtime/utils/log" + "go.fuhry.dev/runtime/utils/option" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -40,25 +41,13 @@ type Server struct { hc HealthCheckServicer } -type ServerOption interface { - apply(*Server) error -} - -type serverOption struct { - callback func(*Server) error -} - -func (o *serverOption) apply(s *Server) error { - return o.callback(s) -} +type ServerOption = option.Option[*Server] func WithTransport(cf common.ConnectionFactory) ServerOption { - return &serverOption{ - callback: func(s *Server) error { - s.connFac = cf - return nil - }, - } + return option.NewOption(func(s *Server) error { + s.connFac = cf + return nil + }) } var defaultPort *uint @@ -114,7 +103,7 @@ func NewGrpcServerWithPort(id mtls.Identity, port uint16, opts ...ServerOption) } for _, opt := range opts { - if err := opt.apply(server); err != nil { + if err := opt.Apply(server); err != nil { return nil, err } } diff --git a/sd/etcd_factory.go b/sd/etcd_factory.go index 487417a..b962c38 100644 --- a/sd/etcd_factory.go +++ b/sd/etcd_factory.go @@ -30,7 +30,7 @@ func NewDefaultEtcdClient() (*etcd_client.Client, error) { logger := log.WithPrefix("etcd-client") - id := mtls.ParseIdentity(etcdMtlsId) + var id mtls.Identity = mtls.ParseIdentity(etcdMtlsId) if !id.IsValid() { id = mtls.DefaultIdentity() diff --git a/sd/watcher.go b/sd/watcher.go index cdda3b3..9b57c07 100644 --- a/sd/watcher.go +++ b/sd/watcher.go @@ -85,9 +85,8 @@ func (w *SDWatcher) GetAddrs(ctx context.Context) ([]ServiceAddress, error) { } w.starter.Do(func() { - go w.watch(ctx) - w.wg.Add(1) + go w.watch(ctx) w.wg.Wait() }) @@ -99,6 +98,10 @@ func (w *SDWatcher) GetAddrs(ctx context.Context) ([]ServiceAddress, error) { } func (w *SDWatcher) watch(ctx context.Context) { + // doneOnce is used to call wg.Done() either as soon as the first result is published, + // or when we have to return due to a fatal error + var doneOnce sync.Once + defer doneOnce.Do(w.wg.Done) kvs := make(map[string][]byte, 0) w.logger.V(1).Infof("Watching for service publications under path %s", w.prefix()) @@ -110,8 +113,7 @@ func (w *SDWatcher) watch(ctx context.Context) { } w.publishResults(kvs) - w.wg.Done() - + doneOnce.Do(w.wg.Done) } watcher := etcd_client.NewWatcher(w.EtcdClient) @@ -131,6 +133,7 @@ func (w *SDWatcher) watch(ctx context.Context) { } w.publishResults(kvs) + doneOnce.Do(w.wg.Done) case <-ctx.Done(): return } diff --git a/utils/option/BUILD.bazel b/utils/option/BUILD.bazel new file mode 100644 index 0000000..8f8fe87 --- /dev/null +++ b/utils/option/BUILD.bazel @@ -0,0 +1,8 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "option", + srcs = ["option.go"], + importpath = "go.fuhry.dev/runtime/utils/option", + visibility = ["//visibility:public"], +) diff --git a/utils/option/option.go b/utils/option/option.go new file mode 100644 index 0000000..4d20871 --- /dev/null +++ b/utils/option/option.go @@ -0,0 +1,19 @@ +package option + +type Option[T any] interface { + Apply(T) error +} + +type optionImpl[T any] struct { + callback func(T) error +} + +func NewOption[T any](callback func(T) error) Option[T] { + return &optionImpl[T]{ + callback: callback, + } +} + +func (o optionImpl[T]) Apply(t T) error { + return o.callback(t) +}