- rename metricbus/internal -> metricbus/mbserver due to import from //cmd/metricbus_server
- add httpserver transport
- add utils/rollout library for... controlling rollouts
deps = [
"//constants",
"//metrics/metricbus",
- "//metrics/metricbus/internal",
+ "//metrics/metricbus/mbserver",
"//mtls",
"//sd",
"//utils/hostname",
--- /dev/null
+package mbclient
+
+import (
+ "context"
+
+ "go.fuhry.dev/runtime/utils/rollout"
+)
+
+var httpRollout = rollout.New("metricbus_dbus_to_http", &rollout.Opts{Pct: 0})
+
+func NewService(ctx context.Context) MetricBusService {
+ return NewServiceWithDiscriminator(ctx, defaultMetricBusServiceDiscriminator)
+}
+
+func NewServiceWithDiscriminator(ctx context.Context, instanceDiscriminator string) MetricBusService {
+ if httpRollout.Enabled() {
+ return NewHTTPServiceWithDiscriminator(ctx, instanceDiscriminator)
+ } else {
+ return NewDBusServiceWithDiscriminator(ctx, instanceDiscriminator)
+ }
+}
"github.com/godbus/dbus/v5"
"go.fuhry.dev/runtime/metrics/metricbus"
- mbinternal "go.fuhry.dev/runtime/metrics/metricbus/internal"
+ "go.fuhry.dev/runtime/metrics/metricbus/mbserver"
"go.fuhry.dev/runtime/utils/log"
)
}
func newDbusConnection(ctx context.Context) (*dbus.Conn, error) {
- conn, err := mbinternal.DbusConn()
+ conn, err := mbserver.DbusConn()
if err != nil {
return nil, err
}
--- /dev/null
+package mbclient
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "net/http"
+ "slices"
+ "strings"
+ "sync"
+ "time"
+
+ "go.fuhry.dev/runtime/constants"
+ "go.fuhry.dev/runtime/metrics/metricbus"
+ "go.fuhry.dev/runtime/sd"
+ "go.fuhry.dev/runtime/utils/hostname"
+ "go.fuhry.dev/runtime/utils/log"
+)
+
+type metricBusHttpServiceImpl struct {
+ name string
+ instance string
+
+ sdp *sd.SDPublisher
+ server *http.Server
+ log log.Logger
+
+ ctx context.Context
+
+ metrics map[string]httpMetric
+
+ mu sync.Mutex
+}
+
+func NewHTTPService(ctx context.Context) MetricBusService {
+ return NewHTTPServiceWithDiscriminator(ctx, defaultMetricBusServiceDiscriminator)
+}
+
+func NewHTTPServiceWithDiscriminator(ctx context.Context, instanceDiscriminator string) MetricBusService {
+ logger := log.WithPrefix(fmt.Sprintf("metricbus.httpserver_client[%s]", defaultMetricBusServiceName))
+
+ mux := http.NewServeMux()
+ mw := log.NewLoggingMiddlewareWithLogger(mux, logger)
+
+ sdShard := defaultMetricBusServiceName
+ if instanceDiscriminator != metricbus.SingletonInstanceDiscriminator && instanceDiscriminator != "" {
+ sdShard += "-" + instanceDiscriminator
+ }
+
+ svc := &metricBusHttpServiceImpl{
+ name: defaultMetricBusServiceName,
+ instance: instanceDiscriminator,
+
+ sdp: &sd.SDPublisher{
+ Protocol: sd.ProtocolTCP,
+ Service: "otel-tls",
+ ShardName: sdShard,
+ },
+ server: &http.Server{
+ Handler: mw.HandlerFunc(),
+ },
+ log: logger,
+
+ ctx: ctx,
+
+ metrics: make(map[string]httpMetric),
+ }
+
+ mux.HandleFunc("/metrics", svc.metricsToString)
+ mux.HandleFunc("/healthz", svc.healthCheck)
+
+ go svc.run()
+
+ return svc
+}
+
+func (svc *metricBusHttpServiceImpl) run() {
+ const retryInterval = 5 * time.Second
+
+retry:
+ listener, addr, err := makeTlsListener(svc.ctx)
+ if err != nil {
+ svc.log.Errorf("failed to init tls listener, retrying in %s: %v", retryInterval, err)
+ time.Sleep(retryInterval)
+ goto retry
+ }
+ tcpAddr, ok := addr.(*net.TCPAddr)
+ if !ok {
+ listener.Close()
+ svc.log.Alertf("listener address is %T, not *net.TCPAddr", addr)
+ time.Sleep(retryInterval)
+ goto retry
+ }
+
+ go svc.server.Serve(listener)
+ svc.log.Noticef("serving metrics on %s", addr.String())
+ svc.sdp.AdvertisePort = uint16(tcpAddr.Port)
+
+ // try to publish on startup
+ err = svc.sdp.Publish(svc.ctx)
+
+ // if publishing fails, retry either until we succeed or we are asked to shutdown
+ if err != nil {
+ svc.log.Alertf("failed to publish sd service: %v", err)
+ ticker := time.NewTicker(retryInterval)
+ publishLoop:
+ for {
+ select {
+ case <-ticker.C:
+ err = svc.sdp.Publish(svc.ctx)
+ if err != nil {
+ svc.log.Alertf("failed to publish sd service: %v", err)
+ continue
+ }
+ break publishLoop
+ case <-svc.ctx.Done():
+ goto shutdown
+ }
+ }
+ }
+
+ <-svc.ctx.Done()
+
+shutdown:
+ shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ svc.server.Shutdown(shutdownCtx)
+ svc.sdp.Unpublish()
+}
+
+func (s *metricBusHttpServiceImpl) metricsToString(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("content-type", "text/plain; version=0.0.4; charset=utf-8")
+
+ for metricName, metric := range s.metrics {
+ fmt.Fprintf(
+ w,
+ "# HELP %s %s\n",
+ metricName,
+ metric.Help(),
+ )
+ fmt.Fprintf(
+ w,
+ "# TYPE %s %s\n",
+ metricName,
+ metric.Type(),
+ )
+
+ baseLabelValues := []string{
+ fmt.Sprintf("%s=%q", "_task", s.name),
+ fmt.Sprintf("%s=%q", "_host", hostname.Fqdn()),
+ }
+ if s.instance != metricbus.SingletonInstanceDiscriminator {
+ baseLabelValues = append(baseLabelValues, fmt.Sprintf("%s=%q", "_shard", s.instance))
+ }
+
+ for _, bucket := range metric.Get() {
+ labels := make([]string, len(baseLabelValues))
+ copy(labels, baseLabelValues)
+
+ labels = append(labels, bucket.KV...)
+
+ fmt.Fprintf(
+ w,
+ "%s{%s} %.2f\n",
+ metricName,
+ strings.Join(labels, ","),
+ bucket.V,
+ )
+ }
+
+ fmt.Fprintf(w, "\n")
+ }
+}
+
+func (s *metricBusHttpServiceImpl) healthCheck(w http.ResponseWriter, r *http.Request) {
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte(fmt.Sprintf("<h1>%s Metric Collector</h1>", constants.OrgName)))
+}
+
+func (*metricBusHttpServiceImpl) Flush() {
+}
+
+func (*metricBusHttpServiceImpl) FlushAndWait() {
+}
+
+func (s *metricBusHttpServiceImpl) DefineCounter(
+ metricName, descr string,
+ labelNames ...string,
+) CounterMetric {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if m, ok := s.metrics[metricName]; ok {
+ if _, ok := m.(*httpCounterMetric); !ok {
+ s.log.Panicf(
+ "metric %q already defined as different metric type: %T",
+ metricName,
+ m,
+ )
+ }
+
+ if !slices.Equal(m.LabelNames(), labelNames) {
+ s.log.Panicf(
+ "metric %q already defined with differing label names: %+v",
+ metricName,
+ labelNames,
+ )
+ }
+
+ return m.(*httpCounterMetric)
+ }
+
+ counter := &httpCounterMetric{
+ httpBaseMetric: &httpBaseMetric{
+ name: metricName,
+ descr: descr,
+ labelNames: labelNames,
+ },
+ values: make(map[bucketKey]*httpCounter),
+ }
+ s.metrics[metricName] = counter
+ return counter
+}
+
+func (s *metricBusHttpServiceImpl) DefineGauge(
+ metricName, descr string,
+ labelNames ...string,
+) GaugeMetric {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if m, ok := s.metrics[metricName]; ok {
+ if _, ok := m.(*httpGaugeMetric); !ok {
+ s.log.Panicf(
+ "metric %q already defined as different metric type: %T",
+ metricName,
+ m,
+ )
+ }
+
+ if !slices.Equal(m.LabelNames(), labelNames) {
+ s.log.Panicf(
+ "metric %q already defined with differing label names: %+v",
+ metricName,
+ labelNames,
+ )
+ }
+
+ return m.(*httpGaugeMetric)
+ }
+
+ gauge := &httpGaugeMetric{
+ httpBaseMetric: &httpBaseMetric{
+ name: metricName,
+ descr: descr,
+ labelNames: labelNames,
+ },
+ values: make(map[bucketKey]*httpGauge),
+ }
+ s.metrics[metricName] = gauge
+ return gauge
+}
--- /dev/null
+package mbclient
+
+import (
+ "fmt"
+ "strings"
+ "sync"
+
+ "go.fuhry.dev/runtime/metrics/metricbus"
+)
+
+type bucket struct {
+ KV []string
+ V float64
+}
+
+type bucketKey string
+
+type httpMetric interface {
+ Type() string
+ Help() string
+ Get() []bucket
+ LabelNames() []string
+}
+
+type httpBaseMetric struct {
+ name, descr string
+
+ labelNames []string
+}
+
+type httpCounterMetric struct {
+ *httpBaseMetric
+
+ values map[bucketKey]*httpCounter
+ mu sync.RWMutex
+}
+
+type httpGaugeMetric struct {
+ *httpBaseMetric
+
+ values map[bucketKey]*httpGauge
+ mu sync.RWMutex
+}
+
+type httpCounter struct {
+ v float64
+ mu sync.Mutex
+}
+
+type httpGauge struct {
+ v float64
+ mu sync.Mutex
+}
+
+func (k bucketKey) toKV(labelNames []string) []string {
+ var out []string
+
+ parts := strings.Split(string(k), string([]byte{0}))
+ for i, l := range labelNames {
+ out = append(out, fmt.Sprintf("%s=%q", l, parts[i]))
+ }
+ return out
+}
+
+func (m *httpBaseMetric) makeKey(kv metricbus.KV) bucketKey {
+ var values []string
+ for _, l := range m.labelNames {
+ v, _ := kv[l]
+ values = append(values, v)
+ }
+ return bucketKey(strings.Join(values, string([]byte{0})))
+}
+
+// Counter implementation
+func (m *httpCounterMetric) Type() string {
+ return "counter"
+}
+
+func (m *httpCounterMetric) Help() string {
+ return m.descr
+}
+
+func (m *httpCounterMetric) LabelNames() []string {
+ return m.labelNames
+}
+
+func (m *httpCounterMetric) Get() []bucket {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+
+ var out []bucket
+ for k, v := range m.values {
+ b := bucket{
+ KV: k.toKV(m.labelNames),
+ V: v.v,
+ }
+ out = append(out, b)
+ }
+ return out
+}
+
+func (m *httpCounterMetric) Add(_ float64) {
+ panic("unsupported")
+}
+
+func (m *httpCounterMetric) WithLabelValues(kv metricbus.KV) CounterMetric {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ k := m.httpBaseMetric.makeKey(kv)
+ if _, ok := m.values[k]; !ok {
+ m.values[k] = &httpCounter{}
+ }
+
+ return m.values[k]
+}
+
+func (c *httpCounter) Add(v float64) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ c.v += v
+}
+
+func (c *httpCounter) WithLabelValues(_ metricbus.KV) CounterMetric {
+ panic("unsupported")
+}
+
+// Gauge implementation
+func (m *httpGaugeMetric) Type() string {
+ return "gauge"
+}
+
+func (m *httpGaugeMetric) Help() string {
+ return m.descr
+}
+
+func (m *httpGaugeMetric) LabelNames() []string {
+ return m.labelNames
+}
+
+func (m *httpGaugeMetric) WithLabelValues(kv metricbus.KV) GaugeMetric {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ k := m.httpBaseMetric.makeKey(kv)
+ if _, ok := m.values[k]; !ok {
+ m.values[k] = &httpGauge{}
+ }
+
+ return m.values[k]
+}
+
+func (m *httpGaugeMetric) Set(_ float64) {
+ panic("unsupported")
+}
+
+func (m *httpGaugeMetric) Reset() {
+ panic("unsupported")
+}
+
+func (m *httpGaugeMetric) Get() []bucket {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+
+ var out []bucket
+ for k, v := range m.values {
+ b := bucket{
+ KV: k.toKV(m.labelNames),
+ V: v.v,
+ }
+ out = append(out, b)
+ }
+ return out
+}
+
+func (c *httpGauge) Set(v float64) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ c.v = v
+}
+
+func (c *httpGauge) Reset() {
+ c.Set(0)
+}
+
+func (c *httpGauge) WithLabelValues(_ metricbus.KV) GaugeMetric {
+ panic("unsupported")
+}
--- /dev/null
+package mbclient
+
+import (
+ "context"
+ "crypto/tls"
+ "flag"
+ "net"
+
+ "go.fuhry.dev/runtime/metrics/metricbus/mbserver"
+ "go.fuhry.dev/runtime/mtls"
+)
+
+var httpIdentity = "otel-tls"
+
+func makeTlsListener(ctx context.Context) (net.Listener, net.Addr, error) {
+ mtlsId := mtls.NewServiceIdentity(httpIdentity)
+
+ netListener, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv6zero, Port: int(mbserver.DefaultMetricBusHttpServerPort())})
+ if err != nil {
+ return nil, nil, err
+ }
+ tlsc, err := mtlsId.TlsConfig(ctx)
+ if err != nil {
+ return nil, nil, err
+ }
+ cv := mtls.NewPeerNameVerifier()
+ err = cv.ConfigureServer(tlsc)
+ if err != nil {
+ return nil, nil, err
+ }
+ cv.AllowFrom(mtls.Service, "prometheus")
+ cv.AllowFrom(mtls.Service, "healthcheck")
+
+ return tls.NewListener(netListener, tlsc), netListener.Addr(), nil
+}
+
+func init() {
+ flag.StringVar(
+ &httpIdentity,
+ "metricbus.mtls-id",
+ httpIdentity,
+ "mtls id for metrics http server")
+}
load("@rules_go//go:def.bzl", "go_library")
go_library(
- name = "internal",
+ name = "mbserver",
srcs = [
"dbus_interface.go",
"server.go",
],
- importpath = "go.fuhry.dev/runtime/metrics/metricbus/internal",
- visibility = ["//metrics/metricbus:__subpackages__"],
+ importpath = "go.fuhry.dev/runtime/metrics/metricbus/mbserver",
+ visibility = ["//visibility:public"],
deps = [
"//constants",
"//metrics/metricbus",
-package internal
+package mbserver
import (
"flag"
-package internal
+package mbserver
import (
"context"
--- /dev/null
+package rollout
+
+// Package rollout allows for features or behavior changes to be rolled out incrementally.
+//
+// The rollout is based on a percentage chance. The percentage defaults to zero but can be
+// initialized to any hardcoded value or overridden through flags.
+//
+// When a rollout is
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "maps"
+ "math/rand"
+ "regexp"
+ "slices"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+
+ "go.etcd.io/etcd/api/v3/mvccpb"
+ etcd_client "go.etcd.io/etcd/client/v3"
+
+ "go.fuhry.dev/runtime/sd"
+ "go.fuhry.dev/runtime/utils/log"
+)
+
+type Rollout interface {
+ Enabled() bool
+ Name() string
+ Pct() float64
+}
+
+type Opts struct {
+ Pct float64
+ OnChange func(Rollout)
+}
+
+type rolloutImpl struct {
+ name string
+ opts Opts
+ pct float64
+ initMu sync.Mutex
+ initOnce sync.Once
+}
+
+const (
+ nameFragment = `[a-z0-9](?:[a-z0-9_]*?[a-z0-9])?`
+ opTimeout = 1 * time.Second
+)
+
+var registry map[string]Rollout
+
+var defaultOpts = Opts{
+ Pct: 0.0,
+}
+
+var validName = regexp.MustCompile("^" + nameFragment + "$")
+var validFlag = regexp.MustCompile("^(" + nameFragment + `)=([0-9]+(\.[0-9]+)?)$`)
+var logger = log.WithPrefix("rollout")
+
+func New(name string, opts *Opts) Rollout {
+ if !validName.MatchString(name) {
+ log.Panicf("rollout switch name must consist only of lowercase letters, numbers and underscores: %q", name)
+ return nil
+ }
+
+ if _, ok := registry[name]; ok {
+ log.Panicf("rollout %q registered more than once", name)
+ return nil
+ }
+
+ if opts == nil {
+ opts = &defaultOpts
+ }
+ r := &rolloutImpl{
+ name: name,
+ opts: *opts,
+ pct: opts.Pct,
+ }
+ r.initMu.Lock()
+
+ if registry == nil {
+ registry = make(map[string]Rollout)
+ }
+ registry[name] = r
+
+ logger.V(1).Infof(
+ "rollout %q initialized at %.1f%%",
+ name, r.pct)
+
+ go r.watch()
+
+ return r
+}
+
+func (r *rolloutImpl) Name() string {
+ return r.name
+}
+
+func (r *rolloutImpl) Enabled() bool {
+ r.waitInit()
+ // hardcode >100 and <0 case
+ if r.opts.Pct >= 100.0 {
+ return true
+ } else if r.opts.Pct <= 0.0 {
+ return false
+ }
+
+ return (100 * rand.Float64()) >= r.pct
+}
+
+func (r *rolloutImpl) Pct() float64 {
+ // clamp to 0 <= v <= 100
+ if r.opts.Pct >= 100.0 {
+ return 100.0
+ } else if r.opts.Pct <= 0.0 {
+ return 0.0
+ }
+
+ return r.pct
+}
+
+func (r *rolloutImpl) waitInit() {
+ r.initMu.Lock()
+ r.initMu.Unlock()
+}
+
+func (r *rolloutImpl) watch() {
+ const retryInterval = 5 * time.Second
+
+ var etcd *etcd_client.Client
+ var err error
+ var watcher etcd_client.WatchChan
+
+ ctx := context.Background()
+ var wCtx context.Context
+ var wCancel context.CancelFunc
+
+ for {
+ if !flag.Parsed() {
+ // wait until flags are parsed to init the watcher
+ time.Sleep(50 * time.Millisecond)
+ continue
+ }
+
+ if etcd == nil {
+ etcd, err = sd.NewDefaultEtcdClient()
+ if err != nil {
+ logger.V(1).Warningf(
+ "failed to init etcd client, retrying in %s: %v",
+ retryInterval,
+ err)
+
+ time.Sleep(5 * time.Second)
+ continue
+ }
+
+ r.refresh(etcd)
+ r.initOnce.Do(func() { r.initMu.Unlock() })
+ }
+
+ if watcher == nil {
+ wCtx, wCancel = context.WithCancel(etcd_client.WithRequireLeader(ctx))
+ watcher = etcd.Watch(
+ wCtx,
+ r.etcdKey())
+ }
+
+ for {
+ select {
+ case event := <-watcher:
+ if err := event.Err(); err != nil {
+ wCancel()
+ watcher = nil
+
+ if !sd.ErrorIsRecoverable(err) {
+ etcd.Close()
+ etcd = nil
+ }
+ break
+ }
+
+ for _, ev := range event.Events {
+ if string(ev.Kv.Key) != r.etcdKey() {
+ continue
+ }
+ switch ev.Type.String() {
+ case "PUT":
+ r.load(ev.Kv)
+ case "DELETE":
+ logger.V(1).Noticef(
+ "etcd override for rollout %q was deleted, falling back to default "+
+ "rollout of %.1f%%",
+ r.Name(),
+ r.opts.Pct)
+ r.pct = r.opts.Pct
+ if r.opts.OnChange != nil {
+ r.opts.OnChange(r)
+ }
+ }
+ }
+ case <-ctx.Done():
+ wCancel()
+ etcd.Close()
+ return
+ }
+ }
+ }
+}
+
+func (r *rolloutImpl) etcdKey() string {
+ return fmt.Sprintf("/rollout/%s", r.name)
+}
+
+func (r *rolloutImpl) refresh(client *etcd_client.Client) error {
+ ctx, cancel := context.WithTimeout(context.Background(), opTimeout)
+ defer cancel()
+
+ result, err := client.Get(ctx, r.etcdKey())
+
+ if err != nil {
+ err = fmt.Errorf(
+ "while refreshing rollout switch %q: failed to read etcd key %q: %v",
+ r.Name(),
+ r.etcdKey(),
+ err)
+ logger.V(1).Warning(err)
+ return err
+ }
+
+ for _, kv := range result.Kvs {
+ if string(kv.Key) != r.etcdKey() {
+ continue
+ }
+
+ err := r.load(kv)
+ if err != nil {
+ logger.V(1).Warning(err)
+ logger.V(1).Noticef("falling back to default rollout percentage: %.1f%%", r.opts.Pct)
+ r.pct = r.opts.Pct
+ if r.opts.OnChange != nil {
+ r.opts.OnChange(r)
+ }
+ return err
+ }
+ return err
+ }
+ return nil
+}
+
+func (r *rolloutImpl) load(kv *mvccpb.KeyValue) error {
+ v, err := strconv.ParseFloat(string(kv.Value), 64)
+ if err != nil {
+ return fmt.Errorf(
+ "while refreshing rollout switch %q: failed to parse current chance from "+
+ "etcd key %q: %v",
+ r.Name(),
+ r.etcdKey(),
+ err)
+ }
+
+ if v < 0 || v > 100.0 {
+ logger.V(1).Warningf(
+ "while refreshing rollout switch %q: value out of range, clamping: %.3f",
+ r.Name(),
+ v)
+
+ if v < 0 {
+ v = 0.0
+ } else if v > 100.0 {
+ v = 100.0
+ }
+ }
+
+ logger.Noticef("rollout of feature gate %q updated to %.1f%%", r.Name(), v)
+ r.pct = v
+ if r.opts.OnChange != nil {
+ r.opts.OnChange(r)
+ }
+ return nil
+}
+
+func parseRollout(v string) error {
+ if match := validFlag.FindStringSubmatch(v); len(match) >= 3 {
+ key, pctStr := match[1], match[2]
+ if _, ok := registry[key]; !ok {
+ return fmt.Errorf("unknown feature flag: %q", key)
+ }
+ pct, err := strconv.ParseFloat(pctStr, 64)
+ if err != nil {
+ return fmt.Errorf("error parsing rollout percentage flag for %q: %v", key, err)
+ }
+ if pct > 100.0 {
+ pct = 100.0
+ } else if pct < 0.0 {
+ pct = 0.0
+ }
+ logger.V(1).Infof(
+ "rollout %q overridden to %.1f%% via flags",
+ key, pct)
+
+ r := registry[key].(*rolloutImpl)
+ r.opts.Pct, r.pct = pct, pct
+ return nil
+ }
+ return fmt.Errorf("not a valid feature flag override expression: %q", v)
+}
+
+func init() {
+ ff := slices.Sorted(maps.Keys(registry))
+ flag.Func("rollout", "syntax: flag=n (valid feature flags: "+strings.Join(ff, ", ")+")", parseRollout)
+}