]> go.fuhry.dev Git - runtime.git/commitdiff
machines: mqtt client: use credentials provider, not updateCreds
authorDan Fuhry <dan@fuhry.com>
Fri, 13 Sep 2024 01:24:08 +0000 (21:24 -0400)
committerDan Fuhry <dan@fuhry.com>
Fri, 13 Sep 2024 01:24:08 +0000 (21:24 -0400)
Fixes mqtt not successfully connecting on the first try (only on retries).

machines/event_watcher.go

index 679a9191bb87290efda3a94f91ed8c91f30fb4da..4efa6320b61ce2eb566fa73a695157f66dae1b2f 100644 (file)
@@ -33,35 +33,28 @@ func (mc *machinesClient) NewEventListener(ctx context.Context) (chan MachinesMq
        mqttOpts := mqtt.NewClientOptions()
 
        mqttOpts.Order = false
-       mc.logger.V(1).Infof("attempting to connect to Machines MQTT server at %s", mc.eventsUrl)
+       mqttLogger.V(1).Infof("attempting to connect to Machines MQTT server at %s", mc.eventsUrl)
        eventsUrl, err := url.Parse(mc.eventsUrl)
        if err != nil {
                return nil, fmt.Errorf("events url %q is invalid: %v", mc.eventsUrl, err)
        }
 
-       updateCreds := func(mqttOpts *mqtt.ClientOptions) error {
+       credentialsProvider := func() (string, string) {
                accessToken, err := oauthClient.getAccessToken()
                if err != nil {
-                       return err
+                       mqttLogger.Errorf("error getting current access token: %v", err)
+                       return "x", "x"
                }
 
-               mc.logger.V(2).Debugf("setting username/password to %s/%s", log.Redact(accessToken), "x")
-               mqttOpts.Username = accessToken
-               mqttOpts.Password = "x"
-
-               return nil
-       }
-
-       if err = updateCreds(mqttOpts); err != nil {
-               return nil, err
+               mqttLogger.V(2).Debugf("setting username/password to %s/%s", log.Redact(accessToken), "x")
+               return accessToken, "x"
        }
 
        mqttOpts.Servers = append(mqttOpts.Servers, eventsUrl)
-       mqttOpts.SetReconnectingHandler(func(client mqtt.Client, opts *mqtt.ClientOptions) {
-               updateCreds(opts)
-       })
+       mqttOpts.SetCredentialsProvider(credentialsProvider)
        mqttOpts.SetPingTimeout(10 * time.Second)
        mqttOpts.SetConnectTimeout(10 * time.Second)
+       mqttOpts.SetCleanSession(true)
        mqttOpts.SetAutoReconnect(true)
        mqttOpts.SetConnectRetry(false)
        mqttOpts.SetMaxReconnectInterval(30 * time.Second)
@@ -115,7 +108,7 @@ func (mc *machinesClient) NewEventListener(ctx context.Context) (chan MachinesMq
                break
        case <-ctx.Done():
                close(msgChan)
-               return nil, context.Canceled
+               return nil, ctx.Err()
        }
 
        go (func() {