"go.fuhry.dev/runtime/utils"
"go.fuhry.dev/runtime/utils/hostname"
"go.fuhry.dev/runtime/utils/log"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
const (
IP6 string
EtcdClient *etcd_client.Client
- mtx *sync.Mutex
- wg *sync.WaitGroup
+ mtx sync.Mutex
+ wg sync.WaitGroup
logger log.Logger
leases etcd_client.Lease
}
func (s *SDPublisher) init() error {
- if s.mtx == nil {
- s.mtx = &sync.Mutex{}
- s.wg = &sync.WaitGroup{}
- }
-
s.mtx.Lock()
defer s.mtx.Unlock()
- s.logger = log.WithPrefix(fmt.Sprintf("SDPublish(%s)", s.Service))
+ s.logger = log.Default().WithPrefix(fmt.Sprintf("SDPublish(%s)", s.Service))
if s.EtcdClient == nil {
cl, err := NewDefaultEtcdClient()
s.LocalRegion = hostname.RegionName()
}
- if s.Regions == nil || len(s.Regions) == 0 {
+ if len(s.Regions) == 0 {
s.Regions = []string{
hostname.RegionName(),
}
}
func (s *SDPublisher) recordsToPublish() []recordToPublish {
- records := make([]recordToPublish, 0)
-
- records = append(records, recordToPublish{
- rtype: "SRV",
- path: s.srvRecordPath(),
- val: s.srvRecordJson(s.LocalRegion),
- })
-
- records = append(records, recordToPublish{
- rtype: "A",
- path: s.aRecordPath(),
- val: s.aRecordJson(),
- })
-
- records = append(records, recordToPublish{
- rtype: "AAAA",
- path: s.aaaaRecordPath(),
- val: s.aaaaRecordJson(),
- })
-
- return records
+ return []recordToPublish{
+ {
+ rtype: "SRV",
+ path: s.srvRecordPath(),
+ val: s.srvRecordJson(s.LocalRegion),
+ },
+ {
+ rtype: "A",
+ path: s.aRecordPath(),
+ val: s.aRecordJson(),
+ },
+ {
+ rtype: "AAAA",
+ path: s.aaaaRecordPath(),
+ val: s.aaaaRecordJson(),
+ },
+ }
}
func (s *SDPublisher) renewalLoop() {
var err error
var kar *etcd_client.LeaseKeepAliveResponse
- kar, err = nil, nil
+ validLease := false
if s.lease != nil {
s.logger.V(3).Debugf("Doing keepalive for service %s on %s", s.Service, s.AdvertiseHost)
- kar, err = s.leases.KeepAliveOnce(s.ctx, s.lease.ID)
- } else {
- s.logger.V(1).Debugf("no active lease for service %s on %s, attempting to acquire one", s.Service, s.AdvertiseHost)
+ kaCtx, cancel := context.WithTimeout(s.ctx, 3*time.Second)
+ defer cancel()
+ kar, err = s.leases.KeepAliveOnce(kaCtx, s.lease.ID)
+ if err == nil && kar != nil && kar.ID != etcd_client.NoLease {
+ validLease = true
+ for _, record := range s.recordsToPublish() {
+ kv := etcd_client.NewKV(s.EtcdClient)
+ _, err := kv.Get(s.ctx, record.path)
+ if e, ok := status.FromError(err); ok && e != nil {
+ if e.Code() == codes.NotFound {
+ validLease = false
+ break
+ }
+ }
+ }
+ }
}
- if kar == nil || err != nil || (kar != nil && kar.ID == etcd_client.NoLease) {
+
+ if !validLease {
+ s.logger.V(1).Debugf("no active lease for service %s on %s, attempting to acquire one", s.Service, s.AdvertiseHost)
// we lost our lease - get a new one
s.lease = nil
lease, err := s.leases.Grant(s.ctx, leaseMaxLifetime)
if lease == nil || err != nil {
- s.logger.Debugf("warning: lost our lease and failed to get a new one. ")
+ s.logger.Debugf("warning: lost our lease and failed to get a new one.")
return
}
s.logger.V(2).Debugf("leaseID: %016x", lease.ID)