"Option" variadic pattern has been used in lots of things lately - grpc client and server, ephs, etc. Although the implementation is trivial, the repetition was becoming obnoxious.
Other trivial fixes:
- Fixed sd watcher wait group never calling Done if etcd client context is canceled
- Fixed etcd_factory type declaration
"//grpc/internal/common",
"//mtls",
"//proto/service/ephs",
+ "//utils/context",
"//utils/log",
+ "//utils/option",
"@com_github_quic_go_quic_go//:quic-go",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
import (
"bytes"
- "context"
"errors"
"fmt"
"io"
grpc_common "go.fuhry.dev/runtime/grpc/internal/common"
"go.fuhry.dev/runtime/mtls"
ephs_pb "go.fuhry.dev/runtime/proto/service/ephs"
+ "go.fuhry.dev/runtime/utils/context"
"go.fuhry.dev/runtime/utils/log"
+ "go.fuhry.dev/runtime/utils/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Watch(ctx context.Context, path string) (<-chan *ephs_pb.WatchResponse, error)
}
-type ClientOption interface {
- Apply(*clientImpl) error
-}
-
-type genericClientOption struct {
- apply func(*clientImpl) error
-}
+type ClientOption = option.Option[*clientImpl]
type clientImpl struct {
defaultCtx context.Context
return out
}
-func (o *genericClientOption) Apply(c *clientImpl) error {
- return o.apply(c)
-}
-
func WithDefaultTimeout(d time.Duration) ClientOption {
- return &genericClientOption{
- apply: func(c *clientImpl) error {
- c.defaultTimeout = d
- return nil
- },
- }
+ return option.NewOption(func(c *clientImpl) error {
+ c.defaultTimeout = d
+ return nil
+ })
}
func DefaultClient() (Client, error) {
}
if DefaultClientContext == nil {
- DefaultClientContext = context.Background()
+ DefaultClientContext, _ = context.Interruptible()
}
client, err := NewClient(DefaultClientContext, mtls.DefaultIdentity())
func (c *clientImpl) grpcClient() (ephs_pb.EphsClient, error) {
var err error
- serverId := mtls.NewServiceIdentity("ephs")
+ serverId := mtls.NewRemoteServiceIdentity("ephs")
if c.client == nil {
c.client, err = grpc.NewGrpcClient(c.defaultCtx, serverId, c.id,
"//sd",
"//utils/hostname",
"//utils/log",
+ "//utils/option",
"//utils/stringmatch",
"@com_github_minio_minio_go_v7//:minio-go",
"@com_github_minio_minio_go_v7//pkg/credentials",
"go.fuhry.dev/runtime/sd"
"go.fuhry.dev/runtime/utils/hostname"
"go.fuhry.dev/runtime/utils/log"
+ "go.fuhry.dev/runtime/utils/option"
)
-type Option interface {
- Apply(*ephsServicer) error
-}
-
-type genericOption struct {
- apply func(*ephsServicer) error
-}
+type Option = option.Option[*ephsServicer]
type msgWithPath interface {
GetPath() string
s3Client *minio.Client
}
-var cell string
+var _ ephs_proto.EphsServer = &ephsServicer{}
-func (o *genericOption) Apply(servicer *ephsServicer) error {
- return o.apply(servicer)
-}
+var cell string
func WithAcl(acl *Acl) Option {
- return &genericOption{
- apply: func(s *ephsServicer) error {
- s.acl = acl
- return nil
- },
- }
+ return option.NewOption(func(s *ephsServicer) error {
+ s.acl = acl
+ return nil
+ })
}
func (s *ephsServicer) watchAcl(aclFile string) {
}
func WithAclFile(aclFile string) Option {
- return &genericOption{
- apply: func(s *ephsServicer) error {
- acl, err := LoadAcl(aclFile)
- if err != nil {
- return err
- }
- s.logger.Noticef("loaded %d ACL rules from %s",
- len(acl.Rules),
- aclFile)
- s.acl = acl
- s.watchAcl(aclFile)
- return nil
- },
- }
+ return option.NewOption(func(s *ephsServicer) error {
+ acl, err := LoadAcl(aclFile)
+ if err != nil {
+ return err
+ }
+ s.logger.Noticef("loaded %d ACL rules from %s",
+ len(acl.Rules),
+ aclFile)
+ s.acl = acl
+ s.watchAcl(aclFile)
+ return nil
+ })
}
func NewEphsServicer(opts ...Option) (ephs_proto.EphsServer, error) {
"//net/dns",
"//sd",
"//utils/context",
+ "//utils/option",
"@org_golang_google_grpc//:grpc",
],
)
"go.fuhry.dev/runtime/grpc/internal/common"
"go.fuhry.dev/runtime/mtls"
"go.fuhry.dev/runtime/sd"
+ "go.fuhry.dev/runtime/utils/option"
"google.golang.org/grpc"
)
Conn() (*grpc.ClientConn, error)
}
-type ClientOption interface {
- apply(*client) error
-}
-
-type clientOption struct {
- f func(*client) error
-}
+type ClientOption = option.Option[*client]
type AddressProvider interface {
GetAddrs(context.Context) ([]sd.ServiceAddress, error)
}
-func (o *clientOption) apply(c *client) error {
- return o.f(c)
-}
-
type client struct {
ctx context.Context
serverId mtls.Identity
}
func WithConnectionFactory(fac common.ConnectionFactory) ClientOption {
- return &clientOption{
- f: func(c *client) error {
- c.connFac = fac
- return nil
- },
- }
+ return option.NewOption(func(c *client) error {
+ c.connFac = fac
+ return nil
+ })
}
func WithAddressProvider(ap AddressProvider) ClientOption {
- return &clientOption{
- f: func(c *client) error {
- c.watcher = ap
- return nil
- },
- }
+ return option.NewOption(func(c *client) error {
+ c.watcher = ap
+ return nil
+ })
}
func WithStaticAddress(addresses ...*net.TCPAddr) ClientOption {
})
}
- return &clientOption{
- f: func(c *client) error {
- c.watcher = &staticAddressProvider{
- addresses: addrs,
- }
- return nil
- },
- }
+ return option.NewOption(func(c *client) error {
+ c.watcher = &staticAddressProvider{
+ addresses: addrs,
+ }
+ return nil
+ })
}
func WithDNSSRV() ClientOption {
- return &clientOption{
- f: func(c *client) error {
- ap := &dnsSrvAddressProvider{
- serverId: c.serverId,
- }
-
- c.watcher = ap
- return nil
- },
- }
+ return option.NewOption(func(c *client) error {
+ ap := &dnsSrvAddressProvider{
+ serverId: c.serverId,
+ }
+
+ c.watcher = ap
+ return nil
+ })
}
func NewGrpcClient(ctx context.Context, serverId, clientId mtls.Identity, opts ...ClientOption) (Client, error) {
}
for _, opt := range opts {
- if err := opt.apply(cl); err != nil {
+ if err := opt.Apply(cl); err != nil {
return nil, err
}
}
"//sd",
"//utils/hostname",
"//utils/log",
+ "//utils/option",
"@com_github_hashicorp_golang_lru_v2//:golang-lru",
"@dev_fuhry_go_grpc_quic//:grpc-quic",
"@org_golang_google_grpc//:grpc",
"go.fuhry.dev/runtime/sd"
"go.fuhry.dev/runtime/utils/hostname"
"go.fuhry.dev/runtime/utils/log"
+ "go.fuhry.dev/runtime/utils/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
hc HealthCheckServicer
}
-type ServerOption interface {
- apply(*Server) error
-}
-
-type serverOption struct {
- callback func(*Server) error
-}
-
-func (o *serverOption) apply(s *Server) error {
- return o.callback(s)
-}
+type ServerOption = option.Option[*Server]
func WithTransport(cf common.ConnectionFactory) ServerOption {
- return &serverOption{
- callback: func(s *Server) error {
- s.connFac = cf
- return nil
- },
- }
+ return option.NewOption(func(s *Server) error {
+ s.connFac = cf
+ return nil
+ })
}
var defaultPort *uint
}
for _, opt := range opts {
- if err := opt.apply(server); err != nil {
+ if err := opt.Apply(server); err != nil {
return nil, err
}
}
logger := log.WithPrefix("etcd-client")
- id := mtls.ParseIdentity(etcdMtlsId)
+ var id mtls.Identity = mtls.ParseIdentity(etcdMtlsId)
if !id.IsValid() {
id = mtls.DefaultIdentity()
}
w.starter.Do(func() {
- go w.watch(ctx)
-
w.wg.Add(1)
+ go w.watch(ctx)
w.wg.Wait()
})
}
func (w *SDWatcher) watch(ctx context.Context) {
+ // doneOnce is used to call wg.Done() either as soon as the first result is published,
+ // or when we have to return due to a fatal error
+ var doneOnce sync.Once
+ defer doneOnce.Do(w.wg.Done)
kvs := make(map[string][]byte, 0)
w.logger.V(1).Infof("Watching for service publications under path %s", w.prefix())
}
w.publishResults(kvs)
- w.wg.Done()
-
+ doneOnce.Do(w.wg.Done)
}
watcher := etcd_client.NewWatcher(w.EtcdClient)
}
w.publishResults(kvs)
+ doneOnce.Do(w.wg.Done)
case <-ctx.Done():
return
}
--- /dev/null
+load("@rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "option",
+ srcs = ["option.go"],
+ importpath = "go.fuhry.dev/runtime/utils/option",
+ visibility = ["//visibility:public"],
+)
--- /dev/null
+package option
+
+type Option[T any] interface {
+ Apply(T) error
+}
+
+type optionImpl[T any] struct {
+ callback func(T) error
+}
+
+func NewOption[T any](callback func(T) error) Option[T] {
+ return &optionImpl[T]{
+ callback: callback,
+ }
+}
+
+func (o optionImpl[T]) Apply(t T) error {
+ return o.callback(t)
+}