Commit 3427aa01 authored by KeYong's avatar KeYong

Merge remote-tracking branch 'origin/develop_dl' into develop_dl

parents f4fc109a aaa8fa21
......@@ -500,6 +500,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
StringBuilder endIndex = new StringBuilder(iotCode).insert(8, '/');
String iotTopic = "influxdb/" + endIndex;
JSONObject msg = new JSONObject();
msg.put("traceId", equipmentSpeIndex.getId() + "");
msg.put(equipmentSpeIndex.getEquipmentIndexKey(), value);
mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg));
......@@ -791,6 +792,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
String iotTopic = "influxdb/" + endIndex;
JSONObject msg = new JSONObject();
msg.put(equipmentSpeIndex.getEquipmentIndexKey(), value);
msg.put("traceId", equipmentSpeIndex.getId() + "");
mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg));
List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode);
......
......@@ -1141,7 +1141,9 @@
<changeSet author="keyong" id="168623599">
<preConditions onFail="MARK_RAN">
<tableExists tableName="wl_car" />
<not>
<columnExists tableName="wl_car" columnName="max_speed"/>
</not>
</preConditions>
<comment>新增属性数据</comment>
<sql>
......
......@@ -16,6 +16,7 @@ import org.springframework.core.io.Resource;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.typroject.tyboot.component.emq.EmqKeeper;
......@@ -78,7 +79,7 @@ public class KafkaConsumerService implements ApplicationRunner {
if (messages.isPresent()) {
JSONObject object = JSONObject.fromObject(record.value());
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
if ((StringUtils.isEmpty(filePath)) || (!ObjectUtils.isEmpty(jsonObj) && Boolean.TRUE.equals(isSendEmq(jsonObj)))) {
if ((StringUtils.isEmpty(filePath)) || (CollectionUtils.isEmpty(codeListInfo)) || (!ObjectUtils.isEmpty(jsonObj) && Boolean.TRUE.equals(isSendEmq(jsonObj)))) {
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
}
}
......@@ -98,7 +99,7 @@ public class KafkaConsumerService implements ApplicationRunner {
try {
JSONObject object = JSONObject.fromObject(record.value());
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
if ((StringUtils.isEmpty(filePath)) || (!ObjectUtils.isEmpty(jsonObj) && Boolean.TRUE.equals(isSendEmq(jsonObj)))) {
if ((StringUtils.isEmpty(filePath)) || (CollectionUtils.isEmpty(codeListInfo)) || (!ObjectUtils.isEmpty(jsonObj) && Boolean.TRUE.equals(isSendEmq(jsonObj)))) {
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
}
ack.acknowledge();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment