MQTT使用-集成SpringBoot自动装配

乐云一
  • 消息队列
  • MQTT
  • IOT
About 1232 wordsAbout 4 min

前篇介绍了MQTT的基本知识与其特性 MQTT协议入门open in new window

使用

消息服务器

首先需要安装一个消息服务器作为MQTT协议的仓库

这里推荐使用 EMQXopen in new window

EMQX 是一款大规模可弹性伸缩的云原生分布式物联网 MQTT 消息服务器。

作为全球最具扩展性的 MQTT 消息服务器,EMQX 提供了高效可靠海量物联网设备连接,能够高性能实时移动与处理消息和事件流数据,帮助您快速构建关键业务的物联网平台与应用。

docker安装

1、获取 Docker 镜像

docker pull emqx/emqx:latest

2、启动 Docker 容器

docker run -d --name emqx -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:latest

记得开放18083【页面】1883【MQTT协议】8081【页面API接口】8083【MQTT-WebSocket】8084【MQTT-SSl】端口

访问IP:18083,检查是否进入到EMQ Dashboard页面,账号密码默认:admin / public

客户端连接使用

配置

    @Bean
    public MqttConnectOptions mqttConnectOptions() throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, KeyManagementException {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("mqtt用户名");
        options.setServerURIs(new String[]{"mqtt地址"});
        options.setPassword("mqtt密码".toCharArray());
        options.setCleanSession(true);
        options.setKeepAliveInterval(90);
        options.setAutomaticReconnect(true);
        options.setMaxInflight(10000);
        options.setConnectionTimeout(120);
        options.setSocketFactory(SslUtil.getSslSocktet("ssl证书地址"));
        return options;
    }

    @Bean
    public MqttAsyncClient mqttAsyncClient(MqttConnectOptions mqttConnectOptions) {
        MqttAsyncClient sampleClient = null;
        try {
            sampleClient = new MqttAsyncClient("mqtt地址", "clientId");
            sampleClient.connect(mqttConnectOptions);
            boolean successful = sampleClient.isConnected();
            long startTime = Clock.systemDefaultZone().millis();
            long timeout = Integer.parseInt("超时时间") * 1000;
            long endTime = startTime;
            while (!successful && (endTime - startTime) <= timeout) {
                Thread.sleep(10);
                successful = sampleClient.isConnected();
                endTime = Clock.systemDefaultZone().millis();
            }
            if (!successful) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("mqtt client connect is timeout");
            }
        } catch (Exception e) {
            logger.error("mqtt client connect is failed.");
        }
        return sampleClient;
    }

发布主题

    @Autowired
    private MqttAsyncClient mqttAsyncClient;

    public boolean messagePublish(String topic,String message,Integer qos) {
//        String topic = "topic-mqtt主题";
        if (StringUtils.isBlank(topic) || StringUtils.isBlank(message)) {
            return false;
        }
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload(message.getBytes());
        mqttMessage.setQos(ObjectUtil.isNull(qos) ? 0 : qos);
        try {
            mqttAsyncClient.publish(topic, mqttMessage);
        } catch (Exception e) {
            return false;
        }
        return true;
    }

订阅主题

@Service
public class MqttSubscribe implements MqttCallback {
    @Override
    public void connectionLost(Throwable throwable) {
        //失去连接 -  重连
    }

    @Override
    public void messageArrived(String s, MqttMessage mqttMessage) {
        String topic = s;
        //消息
        JSONObject messageJson = JSONObject.parseObject(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        //断开连接
    }
}

虽然使用了SpringBoot,但是由于Springboot整合mqtt并不完整,没有一套自动装配的流程,使用需要我们自己构造一个客户端对象进行mqtt连接

开发Springboot-mqtt自动装配项目

项目https://github.com/LeYunone/springboot-mqtt-leyunoneopen in new window

配置

@ConfigurationProperties(prefix = "spring.mqtt")
@Data
public class MqttProperties {

    private String url;

    private String clientId = "mqtt-sdk-client";

    private String username;

    private String password;

    private String group = "default-group";

    private int keepalive;

    private String ssl;
    
    private int timeout;

    private List<MqttTopic> topics = new ArrayList<>();
}

spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.leyunone.springmqtt.config.MqttClientAutoConfiguration

自动装配

@Configuration
@ConditionalOnProperty(value = "spring.mqtt.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(MqttProperties.class)
public class MqttClientAutoConfiguration {
    
    private static final Logger logger = LoggerFactory.getLogger(MqttClientAutoConfiguration.class);

    @Bean
    public MqttConnectOptions mqttConnectOptions(MqttProperties mqttProperties) {
		...
    }

    @Bean
    public MqttAsyncClient mqttAsyncClient(MqttProperties mqttProperties, MqttConnectOptions mqttConnectOptions) {
		...
    }


    @Bean
    public MqttMessageDispatchHandler mqttMessageDispatchHandler(){
        return new MultiHandlerDispatchHandler();
    }

    /**
     * 自动订阅
     * @param mqttProperties mqtt配置
     * @param mqttAsyncClient mqtt客户端
     * @return 订阅实现
     */
    @ConditionalOnMissingBean({MqttAutoSubscribe.class})
    @Bean
    public MqttAutoSubscribe mqttAutoSubscribe(MqttProperties mqttProperties,MqttAsyncClient mqttAsyncClient,MqttMessageDispatchHandler dispatchHandler){
        return new MqttAutoSubscribe(mqttProperties, mqttAsyncClient,dispatchHandler);
    }

自定义注解

消费者: @MqttConsumerHandler

@Component
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttConsumerHandler {

    /**
     * bean对象名
     * @return
     */
    @AliasFor(annotation = Component.class)
    String value() default "";
}

消费者方法: @MqttSubscribe

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface MqttSubscribe {

    @AliasFor("topic")
    String topic() default "\\.*.*";

    @AliasFor("value")
    String value() default "";
}

订阅者配置

public class MultiHandlerDispatchHandler extends MqttCallback implements InitializingBean {

    Logger logger = LoggerFactory.getLogger(MultiHandlerDispatchHandler.class);

    private final List <ConsumerHandler> consumerHandlers = new ArrayList<>();

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) {
        consumerHandlers.forEach(h -> {
            String pattern = h.pattern;
            boolean matches = Pattern.matches(pattern,topic);
            if(matches){
                h.getHandleMethod().invoke(h.beanObject,topic,mqttMessage);
            }
        });
    }
	//加载消费者
    @Override
    public void afterPropertiesSet() {
        Map<String, Object> handlerArray = context.getBeansWithAnnotation(MqttConsumerHandler.class);
        logger.info("mqtt consumer handler list {}",handlerArray.keySet());
        if(handlerArray.size() == 0){
            throw new NullPointerException("mqtt consumer bean is empty...");
        }
        handlerArray.forEach((k,v) -> {
            Class<?> aClass = v.getClass();
            Method[] methods = aClass.getMethods();
            if(methods.length == 0){
                throw new NullPointerException("mqtt consumer handler method is empty,handler "+k);
            }
            Method mqttMessageHandlerMethod = null;
            for(Method method :methods){
                MqttSubscribe annotation = method.getAnnotation(MqttSubscribe.class);
                if(null != annotation){
                    mqttMessageHandlerMethod = method;
                    break;
                }
            }
            Objects.requireNonNull(mqttMessageHandlerMethod,"No consumption method found on this handler "+k);
            MqttSubscribe annotation = AnnotationUtils.getAnnotation(mqttMessageHandlerMethod,MqttSubscribe.class);
            Objects.requireNonNull(annotation,"Method is missing necessary annotations "+mqttMessageHandlerMethod.getName());
            String pattern = annotation.topic();
            ConsumerHandler consumerHandler = new ConsumerHandler(k,v,mqttMessageHandlerMethod,pattern);
            consumerHandlers.add(consumerHandler);
        });
    }
}

自动订阅

    private static final String GROUP_SUBSCRIBE_PREFIX = "$share/";
    private static final String DEFAULT_SUBSCRIBE_PREFIX = "$queue/";
    final private MqttProperties mqttProperties;
    final private MqttAsyncClient mqttAsyncClient;
    final private MqttMessageDispatchHandler dispatchHandler;

    @Override
    public void afterPropertiesSet() {
        if (Thread.interrupted()) {
            return;
        }
        List<MqttProperties.MqttTopic> mqttTopics = mqttProperties.getTopics();
        if (CollectionUtils.isEmpty(mqttTopics)) {
            logger.warn("subscribe topic is blank.subscribe stop");
            return;
        }
        String[] topics = mqttTopics.stream().map(mqttTopic -> {
            String group = mqttProperties.getGroup();
            String topic = mqttTopic.getTopic();
            if (!StringUtils.isEmpty(group)) {
                return GROUP_SUBSCRIBE_PREFIX + group + "/" + topic;
            } else {
                return DEFAULT_SUBSCRIBE_PREFIX + topic;
            }
        }).collect(Collectors.toList()).toArray(new String[mqttTopics.size()]);
        int[] qos = mqttTopics.stream().map(MqttProperties.MqttTopic::getQos).mapToInt(Integer::intValue).toArray();
        try {
            dispatchHandler.setMqttAutoSubscribe(this);
            dispatchHandler.setMqttAsyncClient(mqttAsyncClient);
            mqttAsyncClient.setCallback(dispatchHandler);
            mqttAsyncClient.subscribe(topics, qos);
            logger.info("subscribe success topic:{}", Arrays.toString(topics));
        } catch (Exception e) {
            logger.error("subscribe failed...", e);
        }
    }

最终使用

spring:
 mqtt:
    url: tcp://192.168.151.218:1883
    clientId: iothub.${spring.application.name}:${server.port}
    topics:
      - topic: keepalive/smarthome
        qos: 1
    username: iot_service
    password: TYGHJK&*(&8923
    group: ${spring.application.name}
@MqttConsumerHandler
@Component
public class MqttMessageConsumer {

    private final static Logger logger = LoggerFactory.getLogger(MqttMessageConsumer.class);
    
    @MqttSubscribe(topic = "指定的topic主题")
    public void messageAccept(String topic, MqttMessage message) {
        logger.info("MQTT message receive topic:{},message:{}", topic, message.toString());
    }
}
Last update:
Contributors: leyunone
Comments
  • Latest
  • Oldest
  • Hottest
Powered by Waline v2.14.7