From: Dan Fuhry Date: Fri, 4 Apr 2025 03:47:25 +0000 (-0400) Subject: [sd] several bugfixes incl. http healthchecker memory leak X-Git-Url: https://go.fuhry.dev/?a=commitdiff_plain;h=3419cb34185da5fec9be48553a2554eb2578ddee;p=runtime.git [sd] several bugfixes incl. http healthchecker memory leak --- diff --git a/sd/healthcheck_http.go b/sd/healthcheck_http.go index 773af2b..bc55e4f 100644 --- a/sd/healthcheck_http.go +++ b/sd/healthcheck_http.go @@ -136,6 +136,7 @@ func (hcs *httpHealthCheckService) Check() error { if err != nil { return err } + resp.Body.Close() if resp.StatusCode >= 200 && resp.StatusCode < 400 { return nil diff --git a/sd/publish.go b/sd/publish.go index abe5c5b..c1c19c8 100644 --- a/sd/publish.go +++ b/sd/publish.go @@ -14,6 +14,8 @@ import ( "go.fuhry.dev/runtime/utils" "go.fuhry.dev/runtime/utils/hostname" "go.fuhry.dev/runtime/utils/log" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -36,8 +38,8 @@ type SDPublisher struct { IP6 string EtcdClient *etcd_client.Client - mtx *sync.Mutex - wg *sync.WaitGroup + mtx sync.Mutex + wg sync.WaitGroup logger log.Logger leases etcd_client.Lease @@ -53,15 +55,10 @@ type recordToPublish struct { } func (s *SDPublisher) init() error { - if s.mtx == nil { - s.mtx = &sync.Mutex{} - s.wg = &sync.WaitGroup{} - } - s.mtx.Lock() defer s.mtx.Unlock() - s.logger = log.WithPrefix(fmt.Sprintf("SDPublish(%s)", s.Service)) + s.logger = log.Default().WithPrefix(fmt.Sprintf("SDPublish(%s)", s.Service)) if s.EtcdClient == nil { cl, err := NewDefaultEtcdClient() @@ -109,7 +106,7 @@ func (s *SDPublisher) init() error { s.LocalRegion = hostname.RegionName() } - if s.Regions == nil || len(s.Regions) == 0 { + if len(s.Regions) == 0 { s.Regions = []string{ hostname.RegionName(), } @@ -214,27 +211,23 @@ func (s *SDPublisher) unpublish() { } func (s *SDPublisher) recordsToPublish() []recordToPublish { - records := make([]recordToPublish, 0) - - records = append(records, recordToPublish{ - rtype: "SRV", - path: s.srvRecordPath(), - val: s.srvRecordJson(s.LocalRegion), - }) - - records = append(records, recordToPublish{ - rtype: "A", - path: s.aRecordPath(), - val: s.aRecordJson(), - }) - - records = append(records, recordToPublish{ - rtype: "AAAA", - path: s.aaaaRecordPath(), - val: s.aaaaRecordJson(), - }) - - return records + return []recordToPublish{ + { + rtype: "SRV", + path: s.srvRecordPath(), + val: s.srvRecordJson(s.LocalRegion), + }, + { + rtype: "A", + path: s.aRecordPath(), + val: s.aRecordJson(), + }, + { + rtype: "AAAA", + path: s.aaaaRecordPath(), + val: s.aaaaRecordJson(), + }, + } } func (s *SDPublisher) renewalLoop() { @@ -263,19 +256,34 @@ func (s *SDPublisher) renewalTick() { var err error var kar *etcd_client.LeaseKeepAliveResponse - kar, err = nil, nil + validLease := false if s.lease != nil { s.logger.V(3).Debugf("Doing keepalive for service %s on %s", s.Service, s.AdvertiseHost) - kar, err = s.leases.KeepAliveOnce(s.ctx, s.lease.ID) - } else { - s.logger.V(1).Debugf("no active lease for service %s on %s, attempting to acquire one", s.Service, s.AdvertiseHost) + kaCtx, cancel := context.WithTimeout(s.ctx, 3*time.Second) + defer cancel() + kar, err = s.leases.KeepAliveOnce(kaCtx, s.lease.ID) + if err == nil && kar != nil && kar.ID != etcd_client.NoLease { + validLease = true + for _, record := range s.recordsToPublish() { + kv := etcd_client.NewKV(s.EtcdClient) + _, err := kv.Get(s.ctx, record.path) + if e, ok := status.FromError(err); ok && e != nil { + if e.Code() == codes.NotFound { + validLease = false + break + } + } + } + } } - if kar == nil || err != nil || (kar != nil && kar.ID == etcd_client.NoLease) { + + if !validLease { + s.logger.V(1).Debugf("no active lease for service %s on %s, attempting to acquire one", s.Service, s.AdvertiseHost) // we lost our lease - get a new one s.lease = nil lease, err := s.leases.Grant(s.ctx, leaseMaxLifetime) if lease == nil || err != nil { - s.logger.Debugf("warning: lost our lease and failed to get a new one. ") + s.logger.Debugf("warning: lost our lease and failed to get a new one.") return } s.logger.V(2).Debugf("leaseID: %016x", lease.ID) diff --git a/utils/log/log.go b/utils/log/log.go index 4ee6f05..e753541 100644 --- a/utils/log/log.go +++ b/utils/log/log.go @@ -218,7 +218,7 @@ func (l *logger) WithPrefix(prefix string) Logger { return &logger{ internalLogger: &internalLogger{ Logger: l.Logger, - prefix: l.internalLogger.prefix, + prefix: prefix, level: l.level, }, }