]> go.fuhry.dev Git - runtime.git/commitdiff
[sd] several bugfixes incl. http healthchecker memory leak
authorDan Fuhry <dan@fuhry.com>
Fri, 4 Apr 2025 03:47:25 +0000 (23:47 -0400)
committerDan Fuhry <dan@fuhry.com>
Fri, 4 Apr 2025 03:47:25 +0000 (23:47 -0400)
sd/healthcheck_http.go
sd/publish.go
utils/log/log.go

index 773af2b6c0c87663b1507916eb0f9faddbbf9ab7..bc55e4f73890d82a3a2c06e8931e6cc6695d0057 100644 (file)
@@ -136,6 +136,7 @@ func (hcs *httpHealthCheckService) Check() error {
        if err != nil {
                return err
        }
+       resp.Body.Close()
 
        if resp.StatusCode >= 200 && resp.StatusCode < 400 {
                return nil
index abe5c5bdde8ea6f68ea2113bfd27d23c6302d9a1..c1c19c83219f385b03f5b213527dafc5db6b736c 100644 (file)
@@ -14,6 +14,8 @@ import (
        "go.fuhry.dev/runtime/utils"
        "go.fuhry.dev/runtime/utils/hostname"
        "go.fuhry.dev/runtime/utils/log"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
 )
 
 const (
@@ -36,8 +38,8 @@ type SDPublisher struct {
        IP6           string
        EtcdClient    *etcd_client.Client
 
-       mtx *sync.Mutex
-       wg  *sync.WaitGroup
+       mtx sync.Mutex
+       wg  sync.WaitGroup
 
        logger log.Logger
        leases etcd_client.Lease
@@ -53,15 +55,10 @@ type recordToPublish struct {
 }
 
 func (s *SDPublisher) init() error {
-       if s.mtx == nil {
-               s.mtx = &sync.Mutex{}
-               s.wg = &sync.WaitGroup{}
-       }
-
        s.mtx.Lock()
        defer s.mtx.Unlock()
 
-       s.logger = log.WithPrefix(fmt.Sprintf("SDPublish(%s)", s.Service))
+       s.logger = log.Default().WithPrefix(fmt.Sprintf("SDPublish(%s)", s.Service))
 
        if s.EtcdClient == nil {
                cl, err := NewDefaultEtcdClient()
@@ -109,7 +106,7 @@ func (s *SDPublisher) init() error {
                s.LocalRegion = hostname.RegionName()
        }
 
-       if s.Regions == nil || len(s.Regions) == 0 {
+       if len(s.Regions) == 0 {
                s.Regions = []string{
                        hostname.RegionName(),
                }
@@ -214,27 +211,23 @@ func (s *SDPublisher) unpublish() {
 }
 
 func (s *SDPublisher) recordsToPublish() []recordToPublish {
-       records := make([]recordToPublish, 0)
-
-       records = append(records, recordToPublish{
-               rtype: "SRV",
-               path:  s.srvRecordPath(),
-               val:   s.srvRecordJson(s.LocalRegion),
-       })
-
-       records = append(records, recordToPublish{
-               rtype: "A",
-               path:  s.aRecordPath(),
-               val:   s.aRecordJson(),
-       })
-
-       records = append(records, recordToPublish{
-               rtype: "AAAA",
-               path:  s.aaaaRecordPath(),
-               val:   s.aaaaRecordJson(),
-       })
-
-       return records
+       return []recordToPublish{
+               {
+                       rtype: "SRV",
+                       path:  s.srvRecordPath(),
+                       val:   s.srvRecordJson(s.LocalRegion),
+               },
+               {
+                       rtype: "A",
+                       path:  s.aRecordPath(),
+                       val:   s.aRecordJson(),
+               },
+               {
+                       rtype: "AAAA",
+                       path:  s.aaaaRecordPath(),
+                       val:   s.aaaaRecordJson(),
+               },
+       }
 }
 
 func (s *SDPublisher) renewalLoop() {
@@ -263,19 +256,34 @@ func (s *SDPublisher) renewalTick() {
        var err error
        var kar *etcd_client.LeaseKeepAliveResponse
 
-       kar, err = nil, nil
+       validLease := false
        if s.lease != nil {
                s.logger.V(3).Debugf("Doing keepalive for service %s on %s", s.Service, s.AdvertiseHost)
-               kar, err = s.leases.KeepAliveOnce(s.ctx, s.lease.ID)
-       } else {
-               s.logger.V(1).Debugf("no active lease for service %s on %s, attempting to acquire one", s.Service, s.AdvertiseHost)
+               kaCtx, cancel := context.WithTimeout(s.ctx, 3*time.Second)
+               defer cancel()
+               kar, err = s.leases.KeepAliveOnce(kaCtx, s.lease.ID)
+               if err == nil && kar != nil && kar.ID != etcd_client.NoLease {
+                       validLease = true
+                       for _, record := range s.recordsToPublish() {
+                               kv := etcd_client.NewKV(s.EtcdClient)
+                               _, err := kv.Get(s.ctx, record.path)
+                               if e, ok := status.FromError(err); ok && e != nil {
+                                       if e.Code() == codes.NotFound {
+                                               validLease = false
+                                               break
+                                       }
+                               }
+                       }
+               }
        }
-       if kar == nil || err != nil || (kar != nil && kar.ID == etcd_client.NoLease) {
+
+       if !validLease {
+               s.logger.V(1).Debugf("no active lease for service %s on %s, attempting to acquire one", s.Service, s.AdvertiseHost)
                // we lost our lease - get a new one
                s.lease = nil
                lease, err := s.leases.Grant(s.ctx, leaseMaxLifetime)
                if lease == nil || err != nil {
-                       s.logger.Debugf("warning: lost our lease and failed to get a new one. ")
+                       s.logger.Debugf("warning: lost our lease and failed to get a new one.")
                        return
                }
                s.logger.V(2).Debugf("leaseID: %016x", lease.ID)
index 4ee6f05a14e01f76e0acccbbdbea784ea4f1ed41..e753541532dd8a4e9dded42224eab6f3e3130d87 100644 (file)
@@ -218,7 +218,7 @@ func (l *logger) WithPrefix(prefix string) Logger {
        return &logger{
                internalLogger: &internalLogger{
                        Logger: l.Logger,
-                       prefix: l.internalLogger.prefix,
+                       prefix: prefix,
                        level:  l.level,
                },
        }