From: Dan Fuhry Date: Fri, 13 Sep 2024 01:24:08 +0000 (-0400) Subject: machines: mqtt client: use credentials provider, not updateCreds X-Git-Url: https://go.fuhry.dev/?a=commitdiff_plain;h=9f7d19d23886aae02a542d8b96d18f00752fa7a7;p=runtime.git machines: mqtt client: use credentials provider, not updateCreds Fixes mqtt not successfully connecting on the first try (only on retries). --- diff --git a/machines/event_watcher.go b/machines/event_watcher.go index 679a919..4efa632 100644 --- a/machines/event_watcher.go +++ b/machines/event_watcher.go @@ -33,35 +33,28 @@ func (mc *machinesClient) NewEventListener(ctx context.Context) (chan MachinesMq mqttOpts := mqtt.NewClientOptions() mqttOpts.Order = false - mc.logger.V(1).Infof("attempting to connect to Machines MQTT server at %s", mc.eventsUrl) + mqttLogger.V(1).Infof("attempting to connect to Machines MQTT server at %s", mc.eventsUrl) eventsUrl, err := url.Parse(mc.eventsUrl) if err != nil { return nil, fmt.Errorf("events url %q is invalid: %v", mc.eventsUrl, err) } - updateCreds := func(mqttOpts *mqtt.ClientOptions) error { + credentialsProvider := func() (string, string) { accessToken, err := oauthClient.getAccessToken() if err != nil { - return err + mqttLogger.Errorf("error getting current access token: %v", err) + return "x", "x" } - mc.logger.V(2).Debugf("setting username/password to %s/%s", log.Redact(accessToken), "x") - mqttOpts.Username = accessToken - mqttOpts.Password = "x" - - return nil - } - - if err = updateCreds(mqttOpts); err != nil { - return nil, err + mqttLogger.V(2).Debugf("setting username/password to %s/%s", log.Redact(accessToken), "x") + return accessToken, "x" } mqttOpts.Servers = append(mqttOpts.Servers, eventsUrl) - mqttOpts.SetReconnectingHandler(func(client mqtt.Client, opts *mqtt.ClientOptions) { - updateCreds(opts) - }) + mqttOpts.SetCredentialsProvider(credentialsProvider) mqttOpts.SetPingTimeout(10 * time.Second) mqttOpts.SetConnectTimeout(10 * time.Second) + mqttOpts.SetCleanSession(true) mqttOpts.SetAutoReconnect(true) mqttOpts.SetConnectRetry(false) mqttOpts.SetMaxReconnectInterval(30 * time.Second) @@ -115,7 +108,7 @@ func (mc *machinesClient) NewEventListener(ctx context.Context) (chan MachinesMq break case <-ctx.Done(): close(msgChan) - return nil, context.Canceled + return nil, ctx.Err() } go (func() {