From: Dan Fuhry Date: Fri, 14 Nov 2025 18:00:00 +0000 (-0500) Subject: metricbus: abstract transport X-Git-Url: https://go.fuhry.dev/?a=commitdiff_plain;h=80a25fc6f54f506578fd9de4f625f019ec0bb7bf;p=runtime.git metricbus: abstract transport - rename metricbus/internal -> metricbus/mbserver due to import from //cmd/metricbus_server - add httpserver transport - add utils/rollout library for... controlling rollouts --- diff --git a/metrics/metricbus/mbclient/BUILD.bazel b/metrics/metricbus/mbclient/BUILD.bazel index 27d91d0..7cbdcec 100644 --- a/metrics/metricbus/mbclient/BUILD.bazel +++ b/metrics/metricbus/mbclient/BUILD.bazel @@ -17,7 +17,7 @@ go_library( deps = [ "//constants", "//metrics/metricbus", - "//metrics/metricbus/internal", + "//metrics/metricbus/mbserver", "//mtls", "//sd", "//utils/hostname", diff --git a/metrics/metricbus/mbclient/common.go b/metrics/metricbus/mbclient/common.go new file mode 100644 index 0000000..a7eb2ff --- /dev/null +++ b/metrics/metricbus/mbclient/common.go @@ -0,0 +1,21 @@ +package mbclient + +import ( + "context" + + "go.fuhry.dev/runtime/utils/rollout" +) + +var httpRollout = rollout.New("metricbus_dbus_to_http", &rollout.Opts{Pct: 0}) + +func NewService(ctx context.Context) MetricBusService { + return NewServiceWithDiscriminator(ctx, defaultMetricBusServiceDiscriminator) +} + +func NewServiceWithDiscriminator(ctx context.Context, instanceDiscriminator string) MetricBusService { + if httpRollout.Enabled() { + return NewHTTPServiceWithDiscriminator(ctx, instanceDiscriminator) + } else { + return NewDBusServiceWithDiscriminator(ctx, instanceDiscriminator) + } +} diff --git a/metrics/metricbus/mbclient/conn.go b/metrics/metricbus/mbclient/conn.go index 2065ccd..d619ea3 100644 --- a/metrics/metricbus/mbclient/conn.go +++ b/metrics/metricbus/mbclient/conn.go @@ -7,7 +7,7 @@ import ( "github.com/godbus/dbus/v5" "go.fuhry.dev/runtime/metrics/metricbus" - mbinternal "go.fuhry.dev/runtime/metrics/metricbus/internal" + "go.fuhry.dev/runtime/metrics/metricbus/mbserver" "go.fuhry.dev/runtime/utils/log" ) @@ -24,7 +24,7 @@ type metricBusLowLevelClient struct { } func newDbusConnection(ctx context.Context) (*dbus.Conn, error) { - conn, err := mbinternal.DbusConn() + conn, err := mbserver.DbusConn() if err != nil { return nil, err } diff --git a/metrics/metricbus/mbclient/httpserver_client.go b/metrics/metricbus/mbclient/httpserver_client.go new file mode 100644 index 0000000..ee5aac5 --- /dev/null +++ b/metrics/metricbus/mbclient/httpserver_client.go @@ -0,0 +1,262 @@ +package mbclient + +import ( + "context" + "fmt" + "net" + "net/http" + "slices" + "strings" + "sync" + "time" + + "go.fuhry.dev/runtime/constants" + "go.fuhry.dev/runtime/metrics/metricbus" + "go.fuhry.dev/runtime/sd" + "go.fuhry.dev/runtime/utils/hostname" + "go.fuhry.dev/runtime/utils/log" +) + +type metricBusHttpServiceImpl struct { + name string + instance string + + sdp *sd.SDPublisher + server *http.Server + log log.Logger + + ctx context.Context + + metrics map[string]httpMetric + + mu sync.Mutex +} + +func NewHTTPService(ctx context.Context) MetricBusService { + return NewHTTPServiceWithDiscriminator(ctx, defaultMetricBusServiceDiscriminator) +} + +func NewHTTPServiceWithDiscriminator(ctx context.Context, instanceDiscriminator string) MetricBusService { + logger := log.WithPrefix(fmt.Sprintf("metricbus.httpserver_client[%s]", defaultMetricBusServiceName)) + + mux := http.NewServeMux() + mw := log.NewLoggingMiddlewareWithLogger(mux, logger) + + sdShard := defaultMetricBusServiceName + if instanceDiscriminator != metricbus.SingletonInstanceDiscriminator && instanceDiscriminator != "" { + sdShard += "-" + instanceDiscriminator + } + + svc := &metricBusHttpServiceImpl{ + name: defaultMetricBusServiceName, + instance: instanceDiscriminator, + + sdp: &sd.SDPublisher{ + Protocol: sd.ProtocolTCP, + Service: "otel-tls", + ShardName: sdShard, + }, + server: &http.Server{ + Handler: mw.HandlerFunc(), + }, + log: logger, + + ctx: ctx, + + metrics: make(map[string]httpMetric), + } + + mux.HandleFunc("/metrics", svc.metricsToString) + mux.HandleFunc("/healthz", svc.healthCheck) + + go svc.run() + + return svc +} + +func (svc *metricBusHttpServiceImpl) run() { + const retryInterval = 5 * time.Second + +retry: + listener, addr, err := makeTlsListener(svc.ctx) + if err != nil { + svc.log.Errorf("failed to init tls listener, retrying in %s: %v", retryInterval, err) + time.Sleep(retryInterval) + goto retry + } + tcpAddr, ok := addr.(*net.TCPAddr) + if !ok { + listener.Close() + svc.log.Alertf("listener address is %T, not *net.TCPAddr", addr) + time.Sleep(retryInterval) + goto retry + } + + go svc.server.Serve(listener) + svc.log.Noticef("serving metrics on %s", addr.String()) + svc.sdp.AdvertisePort = uint16(tcpAddr.Port) + + // try to publish on startup + err = svc.sdp.Publish(svc.ctx) + + // if publishing fails, retry either until we succeed or we are asked to shutdown + if err != nil { + svc.log.Alertf("failed to publish sd service: %v", err) + ticker := time.NewTicker(retryInterval) + publishLoop: + for { + select { + case <-ticker.C: + err = svc.sdp.Publish(svc.ctx) + if err != nil { + svc.log.Alertf("failed to publish sd service: %v", err) + continue + } + break publishLoop + case <-svc.ctx.Done(): + goto shutdown + } + } + } + + <-svc.ctx.Done() + +shutdown: + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + svc.server.Shutdown(shutdownCtx) + svc.sdp.Unpublish() +} + +func (s *metricBusHttpServiceImpl) metricsToString(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "text/plain; version=0.0.4; charset=utf-8") + + for metricName, metric := range s.metrics { + fmt.Fprintf( + w, + "# HELP %s %s\n", + metricName, + metric.Help(), + ) + fmt.Fprintf( + w, + "# TYPE %s %s\n", + metricName, + metric.Type(), + ) + + baseLabelValues := []string{ + fmt.Sprintf("%s=%q", "_task", s.name), + fmt.Sprintf("%s=%q", "_host", hostname.Fqdn()), + } + if s.instance != metricbus.SingletonInstanceDiscriminator { + baseLabelValues = append(baseLabelValues, fmt.Sprintf("%s=%q", "_shard", s.instance)) + } + + for _, bucket := range metric.Get() { + labels := make([]string, len(baseLabelValues)) + copy(labels, baseLabelValues) + + labels = append(labels, bucket.KV...) + + fmt.Fprintf( + w, + "%s{%s} %.2f\n", + metricName, + strings.Join(labels, ","), + bucket.V, + ) + } + + fmt.Fprintf(w, "\n") + } +} + +func (s *metricBusHttpServiceImpl) healthCheck(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(fmt.Sprintf("

%s Metric Collector

", constants.OrgName))) +} + +func (*metricBusHttpServiceImpl) Flush() { +} + +func (*metricBusHttpServiceImpl) FlushAndWait() { +} + +func (s *metricBusHttpServiceImpl) DefineCounter( + metricName, descr string, + labelNames ...string, +) CounterMetric { + s.mu.Lock() + defer s.mu.Unlock() + + if m, ok := s.metrics[metricName]; ok { + if _, ok := m.(*httpCounterMetric); !ok { + s.log.Panicf( + "metric %q already defined as different metric type: %T", + metricName, + m, + ) + } + + if !slices.Equal(m.LabelNames(), labelNames) { + s.log.Panicf( + "metric %q already defined with differing label names: %+v", + metricName, + labelNames, + ) + } + + return m.(*httpCounterMetric) + } + + counter := &httpCounterMetric{ + httpBaseMetric: &httpBaseMetric{ + name: metricName, + descr: descr, + labelNames: labelNames, + }, + values: make(map[bucketKey]*httpCounter), + } + s.metrics[metricName] = counter + return counter +} + +func (s *metricBusHttpServiceImpl) DefineGauge( + metricName, descr string, + labelNames ...string, +) GaugeMetric { + s.mu.Lock() + defer s.mu.Unlock() + + if m, ok := s.metrics[metricName]; ok { + if _, ok := m.(*httpGaugeMetric); !ok { + s.log.Panicf( + "metric %q already defined as different metric type: %T", + metricName, + m, + ) + } + + if !slices.Equal(m.LabelNames(), labelNames) { + s.log.Panicf( + "metric %q already defined with differing label names: %+v", + metricName, + labelNames, + ) + } + + return m.(*httpGaugeMetric) + } + + gauge := &httpGaugeMetric{ + httpBaseMetric: &httpBaseMetric{ + name: metricName, + descr: descr, + labelNames: labelNames, + }, + values: make(map[bucketKey]*httpGauge), + } + s.metrics[metricName] = gauge + return gauge +} diff --git a/metrics/metricbus/mbclient/httpserver_metrics.go b/metrics/metricbus/mbclient/httpserver_metrics.go new file mode 100644 index 0000000..695361e --- /dev/null +++ b/metrics/metricbus/mbclient/httpserver_metrics.go @@ -0,0 +1,190 @@ +package mbclient + +import ( + "fmt" + "strings" + "sync" + + "go.fuhry.dev/runtime/metrics/metricbus" +) + +type bucket struct { + KV []string + V float64 +} + +type bucketKey string + +type httpMetric interface { + Type() string + Help() string + Get() []bucket + LabelNames() []string +} + +type httpBaseMetric struct { + name, descr string + + labelNames []string +} + +type httpCounterMetric struct { + *httpBaseMetric + + values map[bucketKey]*httpCounter + mu sync.RWMutex +} + +type httpGaugeMetric struct { + *httpBaseMetric + + values map[bucketKey]*httpGauge + mu sync.RWMutex +} + +type httpCounter struct { + v float64 + mu sync.Mutex +} + +type httpGauge struct { + v float64 + mu sync.Mutex +} + +func (k bucketKey) toKV(labelNames []string) []string { + var out []string + + parts := strings.Split(string(k), string([]byte{0})) + for i, l := range labelNames { + out = append(out, fmt.Sprintf("%s=%q", l, parts[i])) + } + return out +} + +func (m *httpBaseMetric) makeKey(kv metricbus.KV) bucketKey { + var values []string + for _, l := range m.labelNames { + v, _ := kv[l] + values = append(values, v) + } + return bucketKey(strings.Join(values, string([]byte{0}))) +} + +// Counter implementation +func (m *httpCounterMetric) Type() string { + return "counter" +} + +func (m *httpCounterMetric) Help() string { + return m.descr +} + +func (m *httpCounterMetric) LabelNames() []string { + return m.labelNames +} + +func (m *httpCounterMetric) Get() []bucket { + m.mu.RLock() + defer m.mu.RUnlock() + + var out []bucket + for k, v := range m.values { + b := bucket{ + KV: k.toKV(m.labelNames), + V: v.v, + } + out = append(out, b) + } + return out +} + +func (m *httpCounterMetric) Add(_ float64) { + panic("unsupported") +} + +func (m *httpCounterMetric) WithLabelValues(kv metricbus.KV) CounterMetric { + m.mu.Lock() + defer m.mu.Unlock() + + k := m.httpBaseMetric.makeKey(kv) + if _, ok := m.values[k]; !ok { + m.values[k] = &httpCounter{} + } + + return m.values[k] +} + +func (c *httpCounter) Add(v float64) { + c.mu.Lock() + defer c.mu.Unlock() + + c.v += v +} + +func (c *httpCounter) WithLabelValues(_ metricbus.KV) CounterMetric { + panic("unsupported") +} + +// Gauge implementation +func (m *httpGaugeMetric) Type() string { + return "gauge" +} + +func (m *httpGaugeMetric) Help() string { + return m.descr +} + +func (m *httpGaugeMetric) LabelNames() []string { + return m.labelNames +} + +func (m *httpGaugeMetric) WithLabelValues(kv metricbus.KV) GaugeMetric { + m.mu.Lock() + defer m.mu.Unlock() + + k := m.httpBaseMetric.makeKey(kv) + if _, ok := m.values[k]; !ok { + m.values[k] = &httpGauge{} + } + + return m.values[k] +} + +func (m *httpGaugeMetric) Set(_ float64) { + panic("unsupported") +} + +func (m *httpGaugeMetric) Reset() { + panic("unsupported") +} + +func (m *httpGaugeMetric) Get() []bucket { + m.mu.RLock() + defer m.mu.RUnlock() + + var out []bucket + for k, v := range m.values { + b := bucket{ + KV: k.toKV(m.labelNames), + V: v.v, + } + out = append(out, b) + } + return out +} + +func (c *httpGauge) Set(v float64) { + c.mu.Lock() + defer c.mu.Unlock() + + c.v = v +} + +func (c *httpGauge) Reset() { + c.Set(0) +} + +func (c *httpGauge) WithLabelValues(_ metricbus.KV) GaugeMetric { + panic("unsupported") +} diff --git a/metrics/metricbus/mbclient/httpserver_utils.go b/metrics/metricbus/mbclient/httpserver_utils.go new file mode 100644 index 0000000..c8b7ee9 --- /dev/null +++ b/metrics/metricbus/mbclient/httpserver_utils.go @@ -0,0 +1,43 @@ +package mbclient + +import ( + "context" + "crypto/tls" + "flag" + "net" + + "go.fuhry.dev/runtime/metrics/metricbus/mbserver" + "go.fuhry.dev/runtime/mtls" +) + +var httpIdentity = "otel-tls" + +func makeTlsListener(ctx context.Context) (net.Listener, net.Addr, error) { + mtlsId := mtls.NewServiceIdentity(httpIdentity) + + netListener, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv6zero, Port: int(mbserver.DefaultMetricBusHttpServerPort())}) + if err != nil { + return nil, nil, err + } + tlsc, err := mtlsId.TlsConfig(ctx) + if err != nil { + return nil, nil, err + } + cv := mtls.NewPeerNameVerifier() + err = cv.ConfigureServer(tlsc) + if err != nil { + return nil, nil, err + } + cv.AllowFrom(mtls.Service, "prometheus") + cv.AllowFrom(mtls.Service, "healthcheck") + + return tls.NewListener(netListener, tlsc), netListener.Addr(), nil +} + +func init() { + flag.StringVar( + &httpIdentity, + "metricbus.mtls-id", + httpIdentity, + "mtls id for metrics http server") +} diff --git a/metrics/metricbus/internal/BUILD.bazel b/metrics/metricbus/mbserver/BUILD.bazel similarity index 75% rename from metrics/metricbus/internal/BUILD.bazel rename to metrics/metricbus/mbserver/BUILD.bazel index 659a769..6e32ee3 100644 --- a/metrics/metricbus/internal/BUILD.bazel +++ b/metrics/metricbus/mbserver/BUILD.bazel @@ -1,13 +1,13 @@ load("@rules_go//go:def.bzl", "go_library") go_library( - name = "internal", + name = "mbserver", srcs = [ "dbus_interface.go", "server.go", ], - importpath = "go.fuhry.dev/runtime/metrics/metricbus/internal", - visibility = ["//metrics/metricbus:__subpackages__"], + importpath = "go.fuhry.dev/runtime/metrics/metricbus/mbserver", + visibility = ["//visibility:public"], deps = [ "//constants", "//metrics/metricbus", diff --git a/metrics/metricbus/internal/dbus_interface.go b/metrics/metricbus/mbserver/dbus_interface.go similarity index 98% rename from metrics/metricbus/internal/dbus_interface.go rename to metrics/metricbus/mbserver/dbus_interface.go index 58e25d7..0a021fc 100644 --- a/metrics/metricbus/internal/dbus_interface.go +++ b/metrics/metricbus/mbserver/dbus_interface.go @@ -1,4 +1,4 @@ -package internal +package mbserver import ( "flag" diff --git a/metrics/metricbus/internal/server.go b/metrics/metricbus/mbserver/server.go similarity index 99% rename from metrics/metricbus/internal/server.go rename to metrics/metricbus/mbserver/server.go index fd6dc50..59c9a11 100644 --- a/metrics/metricbus/internal/server.go +++ b/metrics/metricbus/mbserver/server.go @@ -1,4 +1,4 @@ -package internal +package mbserver import ( "context" diff --git a/utils/rollout/rollout.go b/utils/rollout/rollout.go new file mode 100644 index 0000000..7974ce5 --- /dev/null +++ b/utils/rollout/rollout.go @@ -0,0 +1,315 @@ +package rollout + +// Package rollout allows for features or behavior changes to be rolled out incrementally. +// +// The rollout is based on a percentage chance. The percentage defaults to zero but can be +// initialized to any hardcoded value or overridden through flags. +// +// When a rollout is + +import ( + "context" + "flag" + "fmt" + "maps" + "math/rand" + "regexp" + "slices" + "strconv" + "strings" + "sync" + "time" + + "go.etcd.io/etcd/api/v3/mvccpb" + etcd_client "go.etcd.io/etcd/client/v3" + + "go.fuhry.dev/runtime/sd" + "go.fuhry.dev/runtime/utils/log" +) + +type Rollout interface { + Enabled() bool + Name() string + Pct() float64 +} + +type Opts struct { + Pct float64 + OnChange func(Rollout) +} + +type rolloutImpl struct { + name string + opts Opts + pct float64 + initMu sync.Mutex + initOnce sync.Once +} + +const ( + nameFragment = `[a-z0-9](?:[a-z0-9_]*?[a-z0-9])?` + opTimeout = 1 * time.Second +) + +var registry map[string]Rollout + +var defaultOpts = Opts{ + Pct: 0.0, +} + +var validName = regexp.MustCompile("^" + nameFragment + "$") +var validFlag = regexp.MustCompile("^(" + nameFragment + `)=([0-9]+(\.[0-9]+)?)$`) +var logger = log.WithPrefix("rollout") + +func New(name string, opts *Opts) Rollout { + if !validName.MatchString(name) { + log.Panicf("rollout switch name must consist only of lowercase letters, numbers and underscores: %q", name) + return nil + } + + if _, ok := registry[name]; ok { + log.Panicf("rollout %q registered more than once", name) + return nil + } + + if opts == nil { + opts = &defaultOpts + } + r := &rolloutImpl{ + name: name, + opts: *opts, + pct: opts.Pct, + } + r.initMu.Lock() + + if registry == nil { + registry = make(map[string]Rollout) + } + registry[name] = r + + logger.V(1).Infof( + "rollout %q initialized at %.1f%%", + name, r.pct) + + go r.watch() + + return r +} + +func (r *rolloutImpl) Name() string { + return r.name +} + +func (r *rolloutImpl) Enabled() bool { + r.waitInit() + // hardcode >100 and <0 case + if r.opts.Pct >= 100.0 { + return true + } else if r.opts.Pct <= 0.0 { + return false + } + + return (100 * rand.Float64()) >= r.pct +} + +func (r *rolloutImpl) Pct() float64 { + // clamp to 0 <= v <= 100 + if r.opts.Pct >= 100.0 { + return 100.0 + } else if r.opts.Pct <= 0.0 { + return 0.0 + } + + return r.pct +} + +func (r *rolloutImpl) waitInit() { + r.initMu.Lock() + r.initMu.Unlock() +} + +func (r *rolloutImpl) watch() { + const retryInterval = 5 * time.Second + + var etcd *etcd_client.Client + var err error + var watcher etcd_client.WatchChan + + ctx := context.Background() + var wCtx context.Context + var wCancel context.CancelFunc + + for { + if !flag.Parsed() { + // wait until flags are parsed to init the watcher + time.Sleep(50 * time.Millisecond) + continue + } + + if etcd == nil { + etcd, err = sd.NewDefaultEtcdClient() + if err != nil { + logger.V(1).Warningf( + "failed to init etcd client, retrying in %s: %v", + retryInterval, + err) + + time.Sleep(5 * time.Second) + continue + } + + r.refresh(etcd) + r.initOnce.Do(func() { r.initMu.Unlock() }) + } + + if watcher == nil { + wCtx, wCancel = context.WithCancel(etcd_client.WithRequireLeader(ctx)) + watcher = etcd.Watch( + wCtx, + r.etcdKey()) + } + + for { + select { + case event := <-watcher: + if err := event.Err(); err != nil { + wCancel() + watcher = nil + + if !sd.ErrorIsRecoverable(err) { + etcd.Close() + etcd = nil + } + break + } + + for _, ev := range event.Events { + if string(ev.Kv.Key) != r.etcdKey() { + continue + } + switch ev.Type.String() { + case "PUT": + r.load(ev.Kv) + case "DELETE": + logger.V(1).Noticef( + "etcd override for rollout %q was deleted, falling back to default "+ + "rollout of %.1f%%", + r.Name(), + r.opts.Pct) + r.pct = r.opts.Pct + if r.opts.OnChange != nil { + r.opts.OnChange(r) + } + } + } + case <-ctx.Done(): + wCancel() + etcd.Close() + return + } + } + } +} + +func (r *rolloutImpl) etcdKey() string { + return fmt.Sprintf("/rollout/%s", r.name) +} + +func (r *rolloutImpl) refresh(client *etcd_client.Client) error { + ctx, cancel := context.WithTimeout(context.Background(), opTimeout) + defer cancel() + + result, err := client.Get(ctx, r.etcdKey()) + + if err != nil { + err = fmt.Errorf( + "while refreshing rollout switch %q: failed to read etcd key %q: %v", + r.Name(), + r.etcdKey(), + err) + logger.V(1).Warning(err) + return err + } + + for _, kv := range result.Kvs { + if string(kv.Key) != r.etcdKey() { + continue + } + + err := r.load(kv) + if err != nil { + logger.V(1).Warning(err) + logger.V(1).Noticef("falling back to default rollout percentage: %.1f%%", r.opts.Pct) + r.pct = r.opts.Pct + if r.opts.OnChange != nil { + r.opts.OnChange(r) + } + return err + } + return err + } + return nil +} + +func (r *rolloutImpl) load(kv *mvccpb.KeyValue) error { + v, err := strconv.ParseFloat(string(kv.Value), 64) + if err != nil { + return fmt.Errorf( + "while refreshing rollout switch %q: failed to parse current chance from "+ + "etcd key %q: %v", + r.Name(), + r.etcdKey(), + err) + } + + if v < 0 || v > 100.0 { + logger.V(1).Warningf( + "while refreshing rollout switch %q: value out of range, clamping: %.3f", + r.Name(), + v) + + if v < 0 { + v = 0.0 + } else if v > 100.0 { + v = 100.0 + } + } + + logger.Noticef("rollout of feature gate %q updated to %.1f%%", r.Name(), v) + r.pct = v + if r.opts.OnChange != nil { + r.opts.OnChange(r) + } + return nil +} + +func parseRollout(v string) error { + if match := validFlag.FindStringSubmatch(v); len(match) >= 3 { + key, pctStr := match[1], match[2] + if _, ok := registry[key]; !ok { + return fmt.Errorf("unknown feature flag: %q", key) + } + pct, err := strconv.ParseFloat(pctStr, 64) + if err != nil { + return fmt.Errorf("error parsing rollout percentage flag for %q: %v", key, err) + } + if pct > 100.0 { + pct = 100.0 + } else if pct < 0.0 { + pct = 0.0 + } + logger.V(1).Infof( + "rollout %q overridden to %.1f%% via flags", + key, pct) + + r := registry[key].(*rolloutImpl) + r.opts.Pct, r.pct = pct, pct + return nil + } + return fmt.Errorf("not a valid feature flag override expression: %q", v) +} + +func init() { + ff := slices.Sorted(maps.Keys(registry)) + flag.Func("rollout", "syntax: flag=n (valid feature flags: "+strings.Join(ff, ", ")+")", parseRollout) +}