Commit 931c4cca authored by suhuiguang's avatar suhuiguang

1.报检机构刷新触发规则口编写

parent 97e110ae
......@@ -11,7 +11,11 @@ import lombok.Getter;
@Getter
public enum TopicEnum {
JYJC_INSPECTION_UNIT_LIST_TOPIC("检验检测报建机构","jyjc/inspection/unit/list/topic");
/**
* 检验检验报检机构刷新相关主题
*/
INSPECTION_LIST_PUSH("检验检测报检机构数据推送主题","%s/inspection-list/push"),
INSPECTION_LIST_REFRESH("检验检测报检机构刷新请求主题","%s/inspection-list/refresh");
private final String name;
......
......@@ -81,9 +81,9 @@ public class RuleActionHandler {
}
private void publishMqttMessage(String componentKey, Object message) throws MqttException {
log.info("{}: {}", TopicEnum.JYJC_INSPECTION_UNIT_LIST_TOPIC.getName(), TopicEnum.JYJC_INSPECTION_UNIT_LIST_TOPIC.getTopic());
log.info("{}: {}", TopicEnum.INSPECTION_LIST_PUSH.getName(), TopicEnum.INSPECTION_LIST_PUSH.getTopic());
try {
emqKeeper.getMqttClient().publish(String.format(componentKey, TopicEnum.JYJC_INSPECTION_UNIT_LIST_TOPIC.getTopic()),
emqKeeper.getMqttClient().publish(String.format(componentKey, TopicEnum.INSPECTION_LIST_PUSH.getTopic()),
JSON.toJSONString(message).getBytes(), RuleConfig.DEFAULT_QOS, false);
} catch (MqttException e) {
log.error("Error publishing MQTT message: {}", e.getMessage());
......
package com.yeejoin.amos.boot.module.jyjc.biz.listener;
import com.yeejoin.amos.boot.module.jyjc.api.enums.TopicEnum;
import com.yeejoin.amos.boot.module.jyjc.biz.listener.message.BizMessage;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.component.emq.EmqxListener;
import javax.annotation.PostConstruct;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author Administrator
*/
@Component
@Slf4j
public class InspectionOrgRefreshListener extends EmqxListener {
@Autowired
private EmqKeeper emqKeeper;
private static final BlockingQueue<BizMessage> BLOCKING_QUEUE = new LinkedBlockingQueue<>();
@Override
public void processMessage(String topic, MqttMessage message) {
if (log.isInfoEnabled()) {
log.info("收到消息主题:{},消息内容:{}", topic, message.toString());
}
BLOCKING_QUEUE.add(new BizMessage(topic,message));
}
@PostConstruct
public void init() throws Exception {
emqKeeper.subscript(TopicEnum.INSPECTION_LIST_REFRESH.getTopic(), 2, this);
new Thread(()->{
while (true) {
try {
BizMessage bizMessage = BLOCKING_QUEUE.take();
// 解析数据(前端对象未定义(componentKey【消息解析】、record、inspectionType)
// 组织规则数据
// 触发规则
} catch (Exception e) {
log.error("数据处理失败",e);
}
}
}).start();
}
}
package com.yeejoin.amos.boot.module.jyjc.biz.listener.message;
import lombok.Data;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.io.Serializable;
@Data
public class BizMessage implements Serializable {
/**
* 主题
*/
private String topic;
/**
* 消息题
*/
private MqttMessage message;
public BizMessage(String topic, MqttMessage message){
this.topic = topic;
this.message = message;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment