Commit e101adf5 authored by 张森's avatar 张森

kafka消费问题处理,

装备发送iot消息添加traceId字段
parent 911f9d8c
...@@ -500,6 +500,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -500,6 +500,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
StringBuilder endIndex = new StringBuilder(iotCode).insert(8, '/'); StringBuilder endIndex = new StringBuilder(iotCode).insert(8, '/');
String iotTopic = "influxdb/" + endIndex; String iotTopic = "influxdb/" + endIndex;
JSONObject msg = new JSONObject(); JSONObject msg = new JSONObject();
msg.put("traceId", equipmentSpeIndex.getId() + "");
msg.put(equipmentSpeIndex.getEquipmentIndexKey(), value); msg.put(equipmentSpeIndex.getEquipmentIndexKey(), value);
mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg)); mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg));
...@@ -791,6 +792,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -791,6 +792,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
String iotTopic = "influxdb/" + endIndex; String iotTopic = "influxdb/" + endIndex;
JSONObject msg = new JSONObject(); JSONObject msg = new JSONObject();
msg.put(equipmentSpeIndex.getEquipmentIndexKey(), value); msg.put(equipmentSpeIndex.getEquipmentIndexKey(), value);
msg.put("traceId", equipmentSpeIndex.getId() + "");
mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg)); mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg));
List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode); List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode);
......
...@@ -1141,7 +1141,9 @@ ...@@ -1141,7 +1141,9 @@
<changeSet author="keyong" id="168623599"> <changeSet author="keyong" id="168623599">
<preConditions onFail="MARK_RAN"> <preConditions onFail="MARK_RAN">
<tableExists tableName="wl_car" /> <not>
<columnExists tableName="wl_car" columnName="max_speed"/>
</not>
</preConditions> </preConditions>
<comment>新增属性数据</comment> <comment>新增属性数据</comment>
<sql> <sql>
......
...@@ -16,6 +16,7 @@ import org.springframework.core.io.Resource; ...@@ -16,6 +16,7 @@ import org.springframework.core.io.Resource;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
...@@ -78,7 +79,7 @@ public class KafkaConsumerService implements ApplicationRunner { ...@@ -78,7 +79,7 @@ public class KafkaConsumerService implements ApplicationRunner {
if (messages.isPresent()) { if (messages.isPresent()) {
JSONObject object = JSONObject.fromObject(record.value()); JSONObject object = JSONObject.fromObject(record.value());
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic()); com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
if ((StringUtils.isEmpty(filePath)) || (!ObjectUtils.isEmpty(jsonObj) && Boolean.TRUE.equals(isSendEmq(jsonObj)))) { if ((StringUtils.isEmpty(filePath)) || (CollectionUtils.isEmpty(codeListInfo)) || (!ObjectUtils.isEmpty(jsonObj) && Boolean.TRUE.equals(isSendEmq(jsonObj)))) {
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false); emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
} }
} }
...@@ -98,7 +99,7 @@ public class KafkaConsumerService implements ApplicationRunner { ...@@ -98,7 +99,7 @@ public class KafkaConsumerService implements ApplicationRunner {
try { try {
JSONObject object = JSONObject.fromObject(record.value()); JSONObject object = JSONObject.fromObject(record.value());
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic()); com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
if ((StringUtils.isEmpty(filePath)) || (!ObjectUtils.isEmpty(jsonObj) && Boolean.TRUE.equals(isSendEmq(jsonObj)))) { if ((StringUtils.isEmpty(filePath)) || (CollectionUtils.isEmpty(codeListInfo)) || (!ObjectUtils.isEmpty(jsonObj) && Boolean.TRUE.equals(isSendEmq(jsonObj)))) {
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false); emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
} }
ack.acknowledge(); ack.acknowledge();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment