]> go.fuhry.dev Git - runtime.git/commitdiff
[grpc/server] instrument grpc servers
authorDan Fuhry <dan@fuhry.com>
Sat, 14 Mar 2026 23:38:00 +0000 (19:38 -0400)
committerDan Fuhry <dan@fuhry.com>
Sun, 15 Mar 2026 01:17:54 +0000 (21:17 -0400)
grpc/internal/server/BUILD.bazel
grpc/internal/server/server.go

index c2b13a3d0bb94e083f20973dd8e51aa54c947c1a..ff7883599eb5fd964cc323ef51f7987849730c08 100644 (file)
@@ -12,6 +12,8 @@ go_library(
     deps = [
         "//grpc/internal/acl",
         "//grpc/internal/common",
+        "//metrics/metricbus",
+        "//metrics/metricbus/mbclient",
         "//mtls",
         "//mtls/certutil",
         "//sd",
index 726e4e24b5abd55bbe78cb94aa9f1c61e47c672c..2892e8bab784d4d030bf98f10dfd33c8c1c60bc8 100644 (file)
@@ -9,6 +9,7 @@ import (
        "fmt"
        "math/rand"
        "net/url"
+       "strings"
 
        lru "github.com/hashicorp/golang-lru/v2"
        grpc_quic "go.fuhry.dev/grpc-quic"
@@ -21,6 +22,8 @@ import (
 
        "go.fuhry.dev/runtime/grpc/internal/acl"
        "go.fuhry.dev/runtime/grpc/internal/common"
+       "go.fuhry.dev/runtime/metrics/metricbus"
+       "go.fuhry.dev/runtime/metrics/metricbus/mbclient"
        "go.fuhry.dev/runtime/mtls"
        "go.fuhry.dev/runtime/mtls/certutil"
        "go.fuhry.dev/runtime/sd"
@@ -41,6 +44,11 @@ type Server struct {
        sessions   *lru.Cache[string, *session]
        connFac    common.ConnectionFactory
        hc         HealthCheckServicer
+       stats      *serverStats
+}
+
+type serverStats struct {
+       calls mbclient.CounterMetric
 }
 
 type ServerOption = option.Option[*Server]
@@ -130,6 +138,9 @@ func NewGrpcServerWithPort(id mtls.Identity, port uint16, opts ...ServerOption)
 func (s *Server) PublishAndServe(ctx context.Context, callback func(*grpc.Server)) error {
        s.log.Noticef("starting %s service on port %d", s.identity.Name(), s.port)
 
+       mbService := mbclient.NewServiceWithDiscriminator(ctx, s.identity.Name())
+       s.stats = newServerStats(mbService)
+
        tc, err := s.identity.TlsConfig(ctx)
        if err != nil {
                return err
@@ -192,41 +203,77 @@ func (s *Server) Stop() {
        }
 }
 
-func (s *Server) handleConnection(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+func (s *Server) logCallMetrics(fullMethod string, err error) {
+       methodParts := strings.Split(strings.Trim(fullMethod, "/"), "/")
+       if len(methodParts) != 2 {
+               return
+       }
+
+       service, method := methodParts[0], methodParts[1]
+       kv := metricbus.KV{
+               "service": service,
+               "method":  method,
+               "status":  "ok",
+       }
+
+       if err != nil {
+               if status, ok := status.FromError(err); ok {
+                       kv["status"] = status.Code().String()
+               } else {
+                       kv["status"] = "non_grpc_error"
+               }
+       }
+
+       s.stats.calls.WithLabelValues(kv).Add(1)
+}
+
+func (s *Server) handleConnection(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (ret interface{}, outErr error) {
+       defer s.logCallMetrics(info.FullMethod, outErr)
+
        if info.FullMethod == "/grpc.health.v1.Health/Check" {
-               return handler(ctx, req)
+               ret, outErr = handler(ctx, req)
+               return
        }
 
        spiffe, err := PeerIdentity(ctx)
        if err != nil {
-               return nil, err
+               outErr = err
+               return
        }
 
        if err := s.acl.Check(info.FullMethod, spiffe); err != nil {
-               return nil, status.Errorf(codes.PermissionDenied, err.Error())
+               outErr = status.Errorf(codes.PermissionDenied, err.Error())
+               return
        }
 
        serverCtx := context.WithValue(ctx, kServer, s)
 
-       return handler(serverCtx, req)
+       ret, outErr = handler(serverCtx, req)
+       return
 }
 
-func (s *Server) handleStreamConnection(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+func (s *Server) handleStreamConnection(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (outErr error) {
+       defer s.logCallMetrics(info.FullMethod, outErr)
+
        if info.FullMethod == "/grpc.health.v1.Health/Watch" {
-               return handler(srv, ss)
+               outErr = handler(srv, ss)
+               return
        }
 
        ctx := ss.Context()
        spiffe, err := PeerIdentity(ctx)
        if err != nil {
-               return err
+               outErr = err
+               return
        }
 
        if err := s.acl.Check(info.FullMethod, spiffe); err != nil {
-               return status.Errorf(codes.PermissionDenied, err.Error())
+               outErr = status.Errorf(codes.PermissionDenied, err.Error())
+               return
        }
 
-       return handler(srv, ss)
+       outErr = handler(srv, ss)
+       return
 }
 
 func PeerCertificate(ctx context.Context) (*x509.Certificate, error) {
@@ -294,6 +341,12 @@ func PeerIdentity(ctx context.Context) (*url.URL, error) {
        return nil, status.Errorf(codes.PermissionDenied, "could not determine your SPIFFEID from your certificate")
 }
 
+func newServerStats(svc mbclient.MetricBusService) *serverStats {
+       return &serverStats{
+               calls: svc.DefineCounter("grpc_calls", "calls to gRPC methods", "service", "method", "status"),
+       }
+}
+
 func init() {
        defaultPort = flag.Uint("grpc.port", RandomPort(), "port for server to listen on")
 }