From: Dan Fuhry Date: Sat, 14 Mar 2026 23:38:00 +0000 (-0400) Subject: [grpc/server] instrument grpc servers X-Git-Url: https://go.fuhry.dev/?a=commitdiff_plain;h=31c9166e6c8331d957af4bd2d4fea6fa806c8d58;p=runtime.git [grpc/server] instrument grpc servers --- diff --git a/grpc/internal/server/BUILD.bazel b/grpc/internal/server/BUILD.bazel index c2b13a3..ff78835 100644 --- a/grpc/internal/server/BUILD.bazel +++ b/grpc/internal/server/BUILD.bazel @@ -12,6 +12,8 @@ go_library( deps = [ "//grpc/internal/acl", "//grpc/internal/common", + "//metrics/metricbus", + "//metrics/metricbus/mbclient", "//mtls", "//mtls/certutil", "//sd", diff --git a/grpc/internal/server/server.go b/grpc/internal/server/server.go index 726e4e2..2892e8b 100644 --- a/grpc/internal/server/server.go +++ b/grpc/internal/server/server.go @@ -9,6 +9,7 @@ import ( "fmt" "math/rand" "net/url" + "strings" lru "github.com/hashicorp/golang-lru/v2" grpc_quic "go.fuhry.dev/grpc-quic" @@ -21,6 +22,8 @@ import ( "go.fuhry.dev/runtime/grpc/internal/acl" "go.fuhry.dev/runtime/grpc/internal/common" + "go.fuhry.dev/runtime/metrics/metricbus" + "go.fuhry.dev/runtime/metrics/metricbus/mbclient" "go.fuhry.dev/runtime/mtls" "go.fuhry.dev/runtime/mtls/certutil" "go.fuhry.dev/runtime/sd" @@ -41,6 +44,11 @@ type Server struct { sessions *lru.Cache[string, *session] connFac common.ConnectionFactory hc HealthCheckServicer + stats *serverStats +} + +type serverStats struct { + calls mbclient.CounterMetric } type ServerOption = option.Option[*Server] @@ -130,6 +138,9 @@ func NewGrpcServerWithPort(id mtls.Identity, port uint16, opts ...ServerOption) func (s *Server) PublishAndServe(ctx context.Context, callback func(*grpc.Server)) error { s.log.Noticef("starting %s service on port %d", s.identity.Name(), s.port) + mbService := mbclient.NewServiceWithDiscriminator(ctx, s.identity.Name()) + s.stats = newServerStats(mbService) + tc, err := s.identity.TlsConfig(ctx) if err != nil { return err @@ -192,41 +203,77 @@ func (s *Server) Stop() { } } -func (s *Server) handleConnection(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { +func (s *Server) logCallMetrics(fullMethod string, err error) { + methodParts := strings.Split(strings.Trim(fullMethod, "/"), "/") + if len(methodParts) != 2 { + return + } + + service, method := methodParts[0], methodParts[1] + kv := metricbus.KV{ + "service": service, + "method": method, + "status": "ok", + } + + if err != nil { + if status, ok := status.FromError(err); ok { + kv["status"] = status.Code().String() + } else { + kv["status"] = "non_grpc_error" + } + } + + s.stats.calls.WithLabelValues(kv).Add(1) +} + +func (s *Server) handleConnection(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (ret interface{}, outErr error) { + defer s.logCallMetrics(info.FullMethod, outErr) + if info.FullMethod == "/grpc.health.v1.Health/Check" { - return handler(ctx, req) + ret, outErr = handler(ctx, req) + return } spiffe, err := PeerIdentity(ctx) if err != nil { - return nil, err + outErr = err + return } if err := s.acl.Check(info.FullMethod, spiffe); err != nil { - return nil, status.Errorf(codes.PermissionDenied, err.Error()) + outErr = status.Errorf(codes.PermissionDenied, err.Error()) + return } serverCtx := context.WithValue(ctx, kServer, s) - return handler(serverCtx, req) + ret, outErr = handler(serverCtx, req) + return } -func (s *Server) handleStreamConnection(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { +func (s *Server) handleStreamConnection(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (outErr error) { + defer s.logCallMetrics(info.FullMethod, outErr) + if info.FullMethod == "/grpc.health.v1.Health/Watch" { - return handler(srv, ss) + outErr = handler(srv, ss) + return } ctx := ss.Context() spiffe, err := PeerIdentity(ctx) if err != nil { - return err + outErr = err + return } if err := s.acl.Check(info.FullMethod, spiffe); err != nil { - return status.Errorf(codes.PermissionDenied, err.Error()) + outErr = status.Errorf(codes.PermissionDenied, err.Error()) + return } - return handler(srv, ss) + outErr = handler(srv, ss) + return } func PeerCertificate(ctx context.Context) (*x509.Certificate, error) { @@ -294,6 +341,12 @@ func PeerIdentity(ctx context.Context) (*url.URL, error) { return nil, status.Errorf(codes.PermissionDenied, "could not determine your SPIFFEID from your certificate") } +func newServerStats(svc mbclient.MetricBusService) *serverStats { + return &serverStats{ + calls: svc.DefineCounter("grpc_calls", "calls to gRPC methods", "service", "method", "status"), + } +} + func init() { defaultPort = flag.Uint("grpc.port", RandomPort(), "port for server to listen on") }