]> go.fuhry.dev Git - runtime.git/commitdiff
metricbus: abstract transport
authorDan Fuhry <dan@fuhry.com>
Fri, 14 Nov 2025 18:00:00 +0000 (13:00 -0500)
committerDan Fuhry <dan@fuhry.com>
Fri, 14 Nov 2025 18:05:00 +0000 (13:05 -0500)
- rename metricbus/internal -> metricbus/mbserver due to import from //cmd/metricbus_server
- add httpserver transport
- add utils/rollout library for... controlling rollouts

metrics/metricbus/mbclient/BUILD.bazel
metrics/metricbus/mbclient/common.go [new file with mode: 0644]
metrics/metricbus/mbclient/conn.go
metrics/metricbus/mbclient/httpserver_client.go [new file with mode: 0644]
metrics/metricbus/mbclient/httpserver_metrics.go [new file with mode: 0644]
metrics/metricbus/mbclient/httpserver_utils.go [new file with mode: 0644]
metrics/metricbus/mbserver/BUILD.bazel [moved from metrics/metricbus/internal/BUILD.bazel with 75% similarity]
metrics/metricbus/mbserver/dbus_interface.go [moved from metrics/metricbus/internal/dbus_interface.go with 98% similarity]
metrics/metricbus/mbserver/server.go [moved from metrics/metricbus/internal/server.go with 99% similarity]
utils/rollout/rollout.go [new file with mode: 0644]

index 27d91d0a678768226e7c6d25101001072e36ccbf..7cbdcec74a14cea564ba4e1ef0c362695c0ab27d 100644 (file)
@@ -17,7 +17,7 @@ go_library(
     deps = [
         "//constants",
         "//metrics/metricbus",
-        "//metrics/metricbus/internal",
+        "//metrics/metricbus/mbserver",
         "//mtls",
         "//sd",
         "//utils/hostname",
diff --git a/metrics/metricbus/mbclient/common.go b/metrics/metricbus/mbclient/common.go
new file mode 100644 (file)
index 0000000..a7eb2ff
--- /dev/null
@@ -0,0 +1,21 @@
+package mbclient
+
+import (
+       "context"
+
+       "go.fuhry.dev/runtime/utils/rollout"
+)
+
+var httpRollout = rollout.New("metricbus_dbus_to_http", &rollout.Opts{Pct: 0})
+
+func NewService(ctx context.Context) MetricBusService {
+       return NewServiceWithDiscriminator(ctx, defaultMetricBusServiceDiscriminator)
+}
+
+func NewServiceWithDiscriminator(ctx context.Context, instanceDiscriminator string) MetricBusService {
+       if httpRollout.Enabled() {
+               return NewHTTPServiceWithDiscriminator(ctx, instanceDiscriminator)
+       } else {
+               return NewDBusServiceWithDiscriminator(ctx, instanceDiscriminator)
+       }
+}
index 2065ccde5cb9d81af5d32c3f994822d034e11651..d619ea399a36b603dd236fbc4d2045b5f9412f0d 100644 (file)
@@ -7,7 +7,7 @@ import (
 
        "github.com/godbus/dbus/v5"
        "go.fuhry.dev/runtime/metrics/metricbus"
-       mbinternal "go.fuhry.dev/runtime/metrics/metricbus/internal"
+       "go.fuhry.dev/runtime/metrics/metricbus/mbserver"
        "go.fuhry.dev/runtime/utils/log"
 )
 
@@ -24,7 +24,7 @@ type metricBusLowLevelClient struct {
 }
 
 func newDbusConnection(ctx context.Context) (*dbus.Conn, error) {
-       conn, err := mbinternal.DbusConn()
+       conn, err := mbserver.DbusConn()
        if err != nil {
                return nil, err
        }
diff --git a/metrics/metricbus/mbclient/httpserver_client.go b/metrics/metricbus/mbclient/httpserver_client.go
new file mode 100644 (file)
index 0000000..ee5aac5
--- /dev/null
@@ -0,0 +1,262 @@
+package mbclient
+
+import (
+       "context"
+       "fmt"
+       "net"
+       "net/http"
+       "slices"
+       "strings"
+       "sync"
+       "time"
+
+       "go.fuhry.dev/runtime/constants"
+       "go.fuhry.dev/runtime/metrics/metricbus"
+       "go.fuhry.dev/runtime/sd"
+       "go.fuhry.dev/runtime/utils/hostname"
+       "go.fuhry.dev/runtime/utils/log"
+)
+
+type metricBusHttpServiceImpl struct {
+       name     string
+       instance string
+
+       sdp    *sd.SDPublisher
+       server *http.Server
+       log    log.Logger
+
+       ctx context.Context
+
+       metrics map[string]httpMetric
+
+       mu sync.Mutex
+}
+
+func NewHTTPService(ctx context.Context) MetricBusService {
+       return NewHTTPServiceWithDiscriminator(ctx, defaultMetricBusServiceDiscriminator)
+}
+
+func NewHTTPServiceWithDiscriminator(ctx context.Context, instanceDiscriminator string) MetricBusService {
+       logger := log.WithPrefix(fmt.Sprintf("metricbus.httpserver_client[%s]", defaultMetricBusServiceName))
+
+       mux := http.NewServeMux()
+       mw := log.NewLoggingMiddlewareWithLogger(mux, logger)
+
+       sdShard := defaultMetricBusServiceName
+       if instanceDiscriminator != metricbus.SingletonInstanceDiscriminator && instanceDiscriminator != "" {
+               sdShard += "-" + instanceDiscriminator
+       }
+
+       svc := &metricBusHttpServiceImpl{
+               name:     defaultMetricBusServiceName,
+               instance: instanceDiscriminator,
+
+               sdp: &sd.SDPublisher{
+                       Protocol:  sd.ProtocolTCP,
+                       Service:   "otel-tls",
+                       ShardName: sdShard,
+               },
+               server: &http.Server{
+                       Handler: mw.HandlerFunc(),
+               },
+               log: logger,
+
+               ctx: ctx,
+
+               metrics: make(map[string]httpMetric),
+       }
+
+       mux.HandleFunc("/metrics", svc.metricsToString)
+       mux.HandleFunc("/healthz", svc.healthCheck)
+
+       go svc.run()
+
+       return svc
+}
+
+func (svc *metricBusHttpServiceImpl) run() {
+       const retryInterval = 5 * time.Second
+
+retry:
+       listener, addr, err := makeTlsListener(svc.ctx)
+       if err != nil {
+               svc.log.Errorf("failed to init tls listener, retrying in %s: %v", retryInterval, err)
+               time.Sleep(retryInterval)
+               goto retry
+       }
+       tcpAddr, ok := addr.(*net.TCPAddr)
+       if !ok {
+               listener.Close()
+               svc.log.Alertf("listener address is %T, not *net.TCPAddr", addr)
+               time.Sleep(retryInterval)
+               goto retry
+       }
+
+       go svc.server.Serve(listener)
+       svc.log.Noticef("serving metrics on %s", addr.String())
+       svc.sdp.AdvertisePort = uint16(tcpAddr.Port)
+
+       // try to publish on startup
+       err = svc.sdp.Publish(svc.ctx)
+
+       // if publishing fails, retry either until we succeed or we are asked to shutdown
+       if err != nil {
+               svc.log.Alertf("failed to publish sd service: %v", err)
+               ticker := time.NewTicker(retryInterval)
+       publishLoop:
+               for {
+                       select {
+                       case <-ticker.C:
+                               err = svc.sdp.Publish(svc.ctx)
+                               if err != nil {
+                                       svc.log.Alertf("failed to publish sd service: %v", err)
+                                       continue
+                               }
+                               break publishLoop
+                       case <-svc.ctx.Done():
+                               goto shutdown
+                       }
+               }
+       }
+
+       <-svc.ctx.Done()
+
+shutdown:
+       shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
+       svc.server.Shutdown(shutdownCtx)
+       svc.sdp.Unpublish()
+}
+
+func (s *metricBusHttpServiceImpl) metricsToString(w http.ResponseWriter, r *http.Request) {
+       w.Header().Set("content-type", "text/plain; version=0.0.4; charset=utf-8")
+
+       for metricName, metric := range s.metrics {
+               fmt.Fprintf(
+                       w,
+                       "# HELP %s %s\n",
+                       metricName,
+                       metric.Help(),
+               )
+               fmt.Fprintf(
+                       w,
+                       "# TYPE %s %s\n",
+                       metricName,
+                       metric.Type(),
+               )
+
+               baseLabelValues := []string{
+                       fmt.Sprintf("%s=%q", "_task", s.name),
+                       fmt.Sprintf("%s=%q", "_host", hostname.Fqdn()),
+               }
+               if s.instance != metricbus.SingletonInstanceDiscriminator {
+                       baseLabelValues = append(baseLabelValues, fmt.Sprintf("%s=%q", "_shard", s.instance))
+               }
+
+               for _, bucket := range metric.Get() {
+                       labels := make([]string, len(baseLabelValues))
+                       copy(labels, baseLabelValues)
+
+                       labels = append(labels, bucket.KV...)
+
+                       fmt.Fprintf(
+                               w,
+                               "%s{%s} %.2f\n",
+                               metricName,
+                               strings.Join(labels, ","),
+                               bucket.V,
+                       )
+               }
+
+               fmt.Fprintf(w, "\n")
+       }
+}
+
+func (s *metricBusHttpServiceImpl) healthCheck(w http.ResponseWriter, r *http.Request) {
+       w.WriteHeader(http.StatusOK)
+       w.Write([]byte(fmt.Sprintf("<h1>%s Metric Collector</h1>", constants.OrgName)))
+}
+
+func (*metricBusHttpServiceImpl) Flush() {
+}
+
+func (*metricBusHttpServiceImpl) FlushAndWait() {
+}
+
+func (s *metricBusHttpServiceImpl) DefineCounter(
+       metricName, descr string,
+       labelNames ...string,
+) CounterMetric {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+
+       if m, ok := s.metrics[metricName]; ok {
+               if _, ok := m.(*httpCounterMetric); !ok {
+                       s.log.Panicf(
+                               "metric %q already defined as different metric type: %T",
+                               metricName,
+                               m,
+                       )
+               }
+
+               if !slices.Equal(m.LabelNames(), labelNames) {
+                       s.log.Panicf(
+                               "metric %q already defined with differing label names: %+v",
+                               metricName,
+                               labelNames,
+                       )
+               }
+
+               return m.(*httpCounterMetric)
+       }
+
+       counter := &httpCounterMetric{
+               httpBaseMetric: &httpBaseMetric{
+                       name:       metricName,
+                       descr:      descr,
+                       labelNames: labelNames,
+               },
+               values: make(map[bucketKey]*httpCounter),
+       }
+       s.metrics[metricName] = counter
+       return counter
+}
+
+func (s *metricBusHttpServiceImpl) DefineGauge(
+       metricName, descr string,
+       labelNames ...string,
+) GaugeMetric {
+       s.mu.Lock()
+       defer s.mu.Unlock()
+
+       if m, ok := s.metrics[metricName]; ok {
+               if _, ok := m.(*httpGaugeMetric); !ok {
+                       s.log.Panicf(
+                               "metric %q already defined as different metric type: %T",
+                               metricName,
+                               m,
+                       )
+               }
+
+               if !slices.Equal(m.LabelNames(), labelNames) {
+                       s.log.Panicf(
+                               "metric %q already defined with differing label names: %+v",
+                               metricName,
+                               labelNames,
+                       )
+               }
+
+               return m.(*httpGaugeMetric)
+       }
+
+       gauge := &httpGaugeMetric{
+               httpBaseMetric: &httpBaseMetric{
+                       name:       metricName,
+                       descr:      descr,
+                       labelNames: labelNames,
+               },
+               values: make(map[bucketKey]*httpGauge),
+       }
+       s.metrics[metricName] = gauge
+       return gauge
+}
diff --git a/metrics/metricbus/mbclient/httpserver_metrics.go b/metrics/metricbus/mbclient/httpserver_metrics.go
new file mode 100644 (file)
index 0000000..695361e
--- /dev/null
@@ -0,0 +1,190 @@
+package mbclient
+
+import (
+       "fmt"
+       "strings"
+       "sync"
+
+       "go.fuhry.dev/runtime/metrics/metricbus"
+)
+
+type bucket struct {
+       KV []string
+       V  float64
+}
+
+type bucketKey string
+
+type httpMetric interface {
+       Type() string
+       Help() string
+       Get() []bucket
+       LabelNames() []string
+}
+
+type httpBaseMetric struct {
+       name, descr string
+
+       labelNames []string
+}
+
+type httpCounterMetric struct {
+       *httpBaseMetric
+
+       values map[bucketKey]*httpCounter
+       mu     sync.RWMutex
+}
+
+type httpGaugeMetric struct {
+       *httpBaseMetric
+
+       values map[bucketKey]*httpGauge
+       mu     sync.RWMutex
+}
+
+type httpCounter struct {
+       v  float64
+       mu sync.Mutex
+}
+
+type httpGauge struct {
+       v  float64
+       mu sync.Mutex
+}
+
+func (k bucketKey) toKV(labelNames []string) []string {
+       var out []string
+
+       parts := strings.Split(string(k), string([]byte{0}))
+       for i, l := range labelNames {
+               out = append(out, fmt.Sprintf("%s=%q", l, parts[i]))
+       }
+       return out
+}
+
+func (m *httpBaseMetric) makeKey(kv metricbus.KV) bucketKey {
+       var values []string
+       for _, l := range m.labelNames {
+               v, _ := kv[l]
+               values = append(values, v)
+       }
+       return bucketKey(strings.Join(values, string([]byte{0})))
+}
+
+// Counter implementation
+func (m *httpCounterMetric) Type() string {
+       return "counter"
+}
+
+func (m *httpCounterMetric) Help() string {
+       return m.descr
+}
+
+func (m *httpCounterMetric) LabelNames() []string {
+       return m.labelNames
+}
+
+func (m *httpCounterMetric) Get() []bucket {
+       m.mu.RLock()
+       defer m.mu.RUnlock()
+
+       var out []bucket
+       for k, v := range m.values {
+               b := bucket{
+                       KV: k.toKV(m.labelNames),
+                       V:  v.v,
+               }
+               out = append(out, b)
+       }
+       return out
+}
+
+func (m *httpCounterMetric) Add(_ float64) {
+       panic("unsupported")
+}
+
+func (m *httpCounterMetric) WithLabelValues(kv metricbus.KV) CounterMetric {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       k := m.httpBaseMetric.makeKey(kv)
+       if _, ok := m.values[k]; !ok {
+               m.values[k] = &httpCounter{}
+       }
+
+       return m.values[k]
+}
+
+func (c *httpCounter) Add(v float64) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+
+       c.v += v
+}
+
+func (c *httpCounter) WithLabelValues(_ metricbus.KV) CounterMetric {
+       panic("unsupported")
+}
+
+// Gauge implementation
+func (m *httpGaugeMetric) Type() string {
+       return "gauge"
+}
+
+func (m *httpGaugeMetric) Help() string {
+       return m.descr
+}
+
+func (m *httpGaugeMetric) LabelNames() []string {
+       return m.labelNames
+}
+
+func (m *httpGaugeMetric) WithLabelValues(kv metricbus.KV) GaugeMetric {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       k := m.httpBaseMetric.makeKey(kv)
+       if _, ok := m.values[k]; !ok {
+               m.values[k] = &httpGauge{}
+       }
+
+       return m.values[k]
+}
+
+func (m *httpGaugeMetric) Set(_ float64) {
+       panic("unsupported")
+}
+
+func (m *httpGaugeMetric) Reset() {
+       panic("unsupported")
+}
+
+func (m *httpGaugeMetric) Get() []bucket {
+       m.mu.RLock()
+       defer m.mu.RUnlock()
+
+       var out []bucket
+       for k, v := range m.values {
+               b := bucket{
+                       KV: k.toKV(m.labelNames),
+                       V:  v.v,
+               }
+               out = append(out, b)
+       }
+       return out
+}
+
+func (c *httpGauge) Set(v float64) {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+
+       c.v = v
+}
+
+func (c *httpGauge) Reset() {
+       c.Set(0)
+}
+
+func (c *httpGauge) WithLabelValues(_ metricbus.KV) GaugeMetric {
+       panic("unsupported")
+}
diff --git a/metrics/metricbus/mbclient/httpserver_utils.go b/metrics/metricbus/mbclient/httpserver_utils.go
new file mode 100644 (file)
index 0000000..c8b7ee9
--- /dev/null
@@ -0,0 +1,43 @@
+package mbclient
+
+import (
+       "context"
+       "crypto/tls"
+       "flag"
+       "net"
+
+       "go.fuhry.dev/runtime/metrics/metricbus/mbserver"
+       "go.fuhry.dev/runtime/mtls"
+)
+
+var httpIdentity = "otel-tls"
+
+func makeTlsListener(ctx context.Context) (net.Listener, net.Addr, error) {
+       mtlsId := mtls.NewServiceIdentity(httpIdentity)
+
+       netListener, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv6zero, Port: int(mbserver.DefaultMetricBusHttpServerPort())})
+       if err != nil {
+               return nil, nil, err
+       }
+       tlsc, err := mtlsId.TlsConfig(ctx)
+       if err != nil {
+               return nil, nil, err
+       }
+       cv := mtls.NewPeerNameVerifier()
+       err = cv.ConfigureServer(tlsc)
+       if err != nil {
+               return nil, nil, err
+       }
+       cv.AllowFrom(mtls.Service, "prometheus")
+       cv.AllowFrom(mtls.Service, "healthcheck")
+
+       return tls.NewListener(netListener, tlsc), netListener.Addr(), nil
+}
+
+func init() {
+       flag.StringVar(
+               &httpIdentity,
+               "metricbus.mtls-id",
+               httpIdentity,
+               "mtls id for metrics http server")
+}
similarity index 75%
rename from metrics/metricbus/internal/BUILD.bazel
rename to metrics/metricbus/mbserver/BUILD.bazel
index 659a7690918752dc5560f3a30af63b088780a1d7..6e32ee3fc16992a117be07e40b8923e7d75529ff 100644 (file)
@@ -1,13 +1,13 @@
 load("@rules_go//go:def.bzl", "go_library")
 
 go_library(
-    name = "internal",
+    name = "mbserver",
     srcs = [
         "dbus_interface.go",
         "server.go",
     ],
-    importpath = "go.fuhry.dev/runtime/metrics/metricbus/internal",
-    visibility = ["//metrics/metricbus:__subpackages__"],
+    importpath = "go.fuhry.dev/runtime/metrics/metricbus/mbserver",
+    visibility = ["//visibility:public"],
     deps = [
         "//constants",
         "//metrics/metricbus",
similarity index 98%
rename from metrics/metricbus/internal/dbus_interface.go
rename to metrics/metricbus/mbserver/dbus_interface.go
index 58e25d7ab98c2c12f2d1f8f28292fcf0ba4dd958..0a021fc38d5d423d8a0d55c2f58a027a95ea2b95 100644 (file)
@@ -1,4 +1,4 @@
-package internal
+package mbserver
 
 import (
        "flag"
similarity index 99%
rename from metrics/metricbus/internal/server.go
rename to metrics/metricbus/mbserver/server.go
index fd6dc50485f07347568e25ff233599cb5016d255..59c9a118cd4fd45f8305c3ca096d66125e23d0da 100644 (file)
@@ -1,4 +1,4 @@
-package internal
+package mbserver
 
 import (
        "context"
diff --git a/utils/rollout/rollout.go b/utils/rollout/rollout.go
new file mode 100644 (file)
index 0000000..7974ce5
--- /dev/null
@@ -0,0 +1,315 @@
+package rollout
+
+// Package rollout allows for features or behavior changes to be rolled out incrementally.
+//
+// The rollout is based on a percentage chance. The percentage defaults to zero but can be
+// initialized to any hardcoded value or overridden through flags.
+//
+// When a rollout is
+
+import (
+       "context"
+       "flag"
+       "fmt"
+       "maps"
+       "math/rand"
+       "regexp"
+       "slices"
+       "strconv"
+       "strings"
+       "sync"
+       "time"
+
+       "go.etcd.io/etcd/api/v3/mvccpb"
+       etcd_client "go.etcd.io/etcd/client/v3"
+
+       "go.fuhry.dev/runtime/sd"
+       "go.fuhry.dev/runtime/utils/log"
+)
+
+type Rollout interface {
+       Enabled() bool
+       Name() string
+       Pct() float64
+}
+
+type Opts struct {
+       Pct      float64
+       OnChange func(Rollout)
+}
+
+type rolloutImpl struct {
+       name     string
+       opts     Opts
+       pct      float64
+       initMu   sync.Mutex
+       initOnce sync.Once
+}
+
+const (
+       nameFragment = `[a-z0-9](?:[a-z0-9_]*?[a-z0-9])?`
+       opTimeout    = 1 * time.Second
+)
+
+var registry map[string]Rollout
+
+var defaultOpts = Opts{
+       Pct: 0.0,
+}
+
+var validName = regexp.MustCompile("^" + nameFragment + "$")
+var validFlag = regexp.MustCompile("^(" + nameFragment + `)=([0-9]+(\.[0-9]+)?)$`)
+var logger = log.WithPrefix("rollout")
+
+func New(name string, opts *Opts) Rollout {
+       if !validName.MatchString(name) {
+               log.Panicf("rollout switch name must consist only of lowercase letters, numbers and underscores: %q", name)
+               return nil
+       }
+
+       if _, ok := registry[name]; ok {
+               log.Panicf("rollout %q registered more than once", name)
+               return nil
+       }
+
+       if opts == nil {
+               opts = &defaultOpts
+       }
+       r := &rolloutImpl{
+               name: name,
+               opts: *opts,
+               pct:  opts.Pct,
+       }
+       r.initMu.Lock()
+
+       if registry == nil {
+               registry = make(map[string]Rollout)
+       }
+       registry[name] = r
+
+       logger.V(1).Infof(
+               "rollout %q initialized at %.1f%%",
+               name, r.pct)
+
+       go r.watch()
+
+       return r
+}
+
+func (r *rolloutImpl) Name() string {
+       return r.name
+}
+
+func (r *rolloutImpl) Enabled() bool {
+       r.waitInit()
+       // hardcode >100 and <0 case
+       if r.opts.Pct >= 100.0 {
+               return true
+       } else if r.opts.Pct <= 0.0 {
+               return false
+       }
+
+       return (100 * rand.Float64()) >= r.pct
+}
+
+func (r *rolloutImpl) Pct() float64 {
+       // clamp to 0 <= v <= 100
+       if r.opts.Pct >= 100.0 {
+               return 100.0
+       } else if r.opts.Pct <= 0.0 {
+               return 0.0
+       }
+
+       return r.pct
+}
+
+func (r *rolloutImpl) waitInit() {
+       r.initMu.Lock()
+       r.initMu.Unlock()
+}
+
+func (r *rolloutImpl) watch() {
+       const retryInterval = 5 * time.Second
+
+       var etcd *etcd_client.Client
+       var err error
+       var watcher etcd_client.WatchChan
+
+       ctx := context.Background()
+       var wCtx context.Context
+       var wCancel context.CancelFunc
+
+       for {
+               if !flag.Parsed() {
+                       // wait until flags are parsed to init the watcher
+                       time.Sleep(50 * time.Millisecond)
+                       continue
+               }
+
+               if etcd == nil {
+                       etcd, err = sd.NewDefaultEtcdClient()
+                       if err != nil {
+                               logger.V(1).Warningf(
+                                       "failed to init etcd client, retrying in %s: %v",
+                                       retryInterval,
+                                       err)
+
+                               time.Sleep(5 * time.Second)
+                               continue
+                       }
+
+                       r.refresh(etcd)
+                       r.initOnce.Do(func() { r.initMu.Unlock() })
+               }
+
+               if watcher == nil {
+                       wCtx, wCancel = context.WithCancel(etcd_client.WithRequireLeader(ctx))
+                       watcher = etcd.Watch(
+                               wCtx,
+                               r.etcdKey())
+               }
+
+               for {
+                       select {
+                       case event := <-watcher:
+                               if err := event.Err(); err != nil {
+                                       wCancel()
+                                       watcher = nil
+
+                                       if !sd.ErrorIsRecoverable(err) {
+                                               etcd.Close()
+                                               etcd = nil
+                                       }
+                                       break
+                               }
+
+                               for _, ev := range event.Events {
+                                       if string(ev.Kv.Key) != r.etcdKey() {
+                                               continue
+                                       }
+                                       switch ev.Type.String() {
+                                       case "PUT":
+                                               r.load(ev.Kv)
+                                       case "DELETE":
+                                               logger.V(1).Noticef(
+                                                       "etcd override for rollout %q was deleted, falling back to default "+
+                                                               "rollout of %.1f%%",
+                                                       r.Name(),
+                                                       r.opts.Pct)
+                                               r.pct = r.opts.Pct
+                                               if r.opts.OnChange != nil {
+                                                       r.opts.OnChange(r)
+                                               }
+                                       }
+                               }
+                       case <-ctx.Done():
+                               wCancel()
+                               etcd.Close()
+                               return
+                       }
+               }
+       }
+}
+
+func (r *rolloutImpl) etcdKey() string {
+       return fmt.Sprintf("/rollout/%s", r.name)
+}
+
+func (r *rolloutImpl) refresh(client *etcd_client.Client) error {
+       ctx, cancel := context.WithTimeout(context.Background(), opTimeout)
+       defer cancel()
+
+       result, err := client.Get(ctx, r.etcdKey())
+
+       if err != nil {
+               err = fmt.Errorf(
+                       "while refreshing rollout switch %q: failed to read etcd key %q: %v",
+                       r.Name(),
+                       r.etcdKey(),
+                       err)
+               logger.V(1).Warning(err)
+               return err
+       }
+
+       for _, kv := range result.Kvs {
+               if string(kv.Key) != r.etcdKey() {
+                       continue
+               }
+
+               err := r.load(kv)
+               if err != nil {
+                       logger.V(1).Warning(err)
+                       logger.V(1).Noticef("falling back to default rollout percentage: %.1f%%", r.opts.Pct)
+                       r.pct = r.opts.Pct
+                       if r.opts.OnChange != nil {
+                               r.opts.OnChange(r)
+                       }
+                       return err
+               }
+               return err
+       }
+       return nil
+}
+
+func (r *rolloutImpl) load(kv *mvccpb.KeyValue) error {
+       v, err := strconv.ParseFloat(string(kv.Value), 64)
+       if err != nil {
+               return fmt.Errorf(
+                       "while refreshing rollout switch %q: failed to parse current chance from "+
+                               "etcd key %q: %v",
+                       r.Name(),
+                       r.etcdKey(),
+                       err)
+       }
+
+       if v < 0 || v > 100.0 {
+               logger.V(1).Warningf(
+                       "while refreshing rollout switch %q: value out of range, clamping: %.3f",
+                       r.Name(),
+                       v)
+
+               if v < 0 {
+                       v = 0.0
+               } else if v > 100.0 {
+                       v = 100.0
+               }
+       }
+
+       logger.Noticef("rollout of feature gate %q updated to %.1f%%", r.Name(), v)
+       r.pct = v
+       if r.opts.OnChange != nil {
+               r.opts.OnChange(r)
+       }
+       return nil
+}
+
+func parseRollout(v string) error {
+       if match := validFlag.FindStringSubmatch(v); len(match) >= 3 {
+               key, pctStr := match[1], match[2]
+               if _, ok := registry[key]; !ok {
+                       return fmt.Errorf("unknown feature flag: %q", key)
+               }
+               pct, err := strconv.ParseFloat(pctStr, 64)
+               if err != nil {
+                       return fmt.Errorf("error parsing rollout percentage flag for %q: %v", key, err)
+               }
+               if pct > 100.0 {
+                       pct = 100.0
+               } else if pct < 0.0 {
+                       pct = 0.0
+               }
+               logger.V(1).Infof(
+                       "rollout %q overridden to %.1f%% via flags",
+                       key, pct)
+
+               r := registry[key].(*rolloutImpl)
+               r.opts.Pct, r.pct = pct, pct
+               return nil
+       }
+       return fmt.Errorf("not a valid feature flag override expression: %q", v)
+}
+
+func init() {
+       ff := slices.Sorted(maps.Keys(registry))
+       flag.Func("rollout", "syntax: flag=n (valid feature flags: "+strings.Join(ff, ", ")+")", parseRollout)
+}