mqttOpts := mqtt.NewClientOptions()
mqttOpts.Order = false
- mc.logger.V(1).Infof("attempting to connect to Machines MQTT server at %s", mc.eventsUrl)
+ mqttLogger.V(1).Infof("attempting to connect to Machines MQTT server at %s", mc.eventsUrl)
eventsUrl, err := url.Parse(mc.eventsUrl)
if err != nil {
return nil, fmt.Errorf("events url %q is invalid: %v", mc.eventsUrl, err)
}
- updateCreds := func(mqttOpts *mqtt.ClientOptions) error {
+ credentialsProvider := func() (string, string) {
accessToken, err := oauthClient.getAccessToken()
if err != nil {
- return err
+ mqttLogger.Errorf("error getting current access token: %v", err)
+ return "x", "x"
}
- mc.logger.V(2).Debugf("setting username/password to %s/%s", log.Redact(accessToken), "x")
- mqttOpts.Username = accessToken
- mqttOpts.Password = "x"
-
- return nil
- }
-
- if err = updateCreds(mqttOpts); err != nil {
- return nil, err
+ mqttLogger.V(2).Debugf("setting username/password to %s/%s", log.Redact(accessToken), "x")
+ return accessToken, "x"
}
mqttOpts.Servers = append(mqttOpts.Servers, eventsUrl)
- mqttOpts.SetReconnectingHandler(func(client mqtt.Client, opts *mqtt.ClientOptions) {
- updateCreds(opts)
- })
+ mqttOpts.SetCredentialsProvider(credentialsProvider)
mqttOpts.SetPingTimeout(10 * time.Second)
mqttOpts.SetConnectTimeout(10 * time.Second)
+ mqttOpts.SetCleanSession(true)
mqttOpts.SetAutoReconnect(true)
mqttOpts.SetConnectRetry(false)
mqttOpts.SetMaxReconnectInterval(30 * time.Second)
break
case <-ctx.Done():
close(msgChan)
- return nil, context.Canceled
+ return nil, ctx.Err()
}
go (func() {