"fmt"
"math/rand"
"net/url"
+ "strings"
lru "github.com/hashicorp/golang-lru/v2"
grpc_quic "go.fuhry.dev/grpc-quic"
"go.fuhry.dev/runtime/grpc/internal/acl"
"go.fuhry.dev/runtime/grpc/internal/common"
+ "go.fuhry.dev/runtime/metrics/metricbus"
+ "go.fuhry.dev/runtime/metrics/metricbus/mbclient"
"go.fuhry.dev/runtime/mtls"
"go.fuhry.dev/runtime/mtls/certutil"
"go.fuhry.dev/runtime/sd"
sessions *lru.Cache[string, *session]
connFac common.ConnectionFactory
hc HealthCheckServicer
+ stats *serverStats
+}
+
+type serverStats struct {
+ calls mbclient.CounterMetric
}
type ServerOption = option.Option[*Server]
func (s *Server) PublishAndServe(ctx context.Context, callback func(*grpc.Server)) error {
s.log.Noticef("starting %s service on port %d", s.identity.Name(), s.port)
+ mbService := mbclient.NewServiceWithDiscriminator(ctx, s.identity.Name())
+ s.stats = newServerStats(mbService)
+
tc, err := s.identity.TlsConfig(ctx)
if err != nil {
return err
}
}
-func (s *Server) handleConnection(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
+func (s *Server) logCallMetrics(fullMethod string, err error) {
+ methodParts := strings.Split(strings.Trim(fullMethod, "/"), "/")
+ if len(methodParts) != 2 {
+ return
+ }
+
+ service, method := methodParts[0], methodParts[1]
+ kv := metricbus.KV{
+ "service": service,
+ "method": method,
+ "status": "ok",
+ }
+
+ if err != nil {
+ if status, ok := status.FromError(err); ok {
+ kv["status"] = status.Code().String()
+ } else {
+ kv["status"] = "non_grpc_error"
+ }
+ }
+
+ s.stats.calls.WithLabelValues(kv).Add(1)
+}
+
+func (s *Server) handleConnection(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (ret interface{}, outErr error) {
+ defer s.logCallMetrics(info.FullMethod, outErr)
+
if info.FullMethod == "/grpc.health.v1.Health/Check" {
- return handler(ctx, req)
+ ret, outErr = handler(ctx, req)
+ return
}
spiffe, err := PeerIdentity(ctx)
if err != nil {
- return nil, err
+ outErr = err
+ return
}
if err := s.acl.Check(info.FullMethod, spiffe); err != nil {
- return nil, status.Errorf(codes.PermissionDenied, err.Error())
+ outErr = status.Errorf(codes.PermissionDenied, err.Error())
+ return
}
serverCtx := context.WithValue(ctx, kServer, s)
- return handler(serverCtx, req)
+ ret, outErr = handler(serverCtx, req)
+ return
}
-func (s *Server) handleStreamConnection(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
+func (s *Server) handleStreamConnection(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (outErr error) {
+ defer s.logCallMetrics(info.FullMethod, outErr)
+
if info.FullMethod == "/grpc.health.v1.Health/Watch" {
- return handler(srv, ss)
+ outErr = handler(srv, ss)
+ return
}
ctx := ss.Context()
spiffe, err := PeerIdentity(ctx)
if err != nil {
- return err
+ outErr = err
+ return
}
if err := s.acl.Check(info.FullMethod, spiffe); err != nil {
- return status.Errorf(codes.PermissionDenied, err.Error())
+ outErr = status.Errorf(codes.PermissionDenied, err.Error())
+ return
}
- return handler(srv, ss)
+ outErr = handler(srv, ss)
+ return
}
func PeerCertificate(ctx context.Context) (*x509.Certificate, error) {
return nil, status.Errorf(codes.PermissionDenied, "could not determine your SPIFFEID from your certificate")
}
+func newServerStats(svc mbclient.MetricBusService) *serverStats {
+ return &serverStats{
+ calls: svc.DefineCounter("grpc_calls", "calls to gRPC methods", "service", "method", "status"),
+ }
+}
+
func init() {
defaultPort = flag.Uint("grpc.port", RandomPort(), "port for server to listen on")
}