Commit 2946ec84 authored by litengwei's avatar litengwei

任务 11626

parent 0335913e
...@@ -24,6 +24,7 @@ import com.yeejoin.equipmanage.remote.RemoteSecurityService; ...@@ -24,6 +24,7 @@ import com.yeejoin.equipmanage.remote.RemoteSecurityService;
import com.yeejoin.equipmanage.service.*; import com.yeejoin.equipmanage.service.*;
import com.yeejoin.equipmanage.utils.BeanUtil; import com.yeejoin.equipmanage.utils.BeanUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.BeanUtils; import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
...@@ -35,6 +36,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager ...@@ -35,6 +36,7 @@ import org.springframework.transaction.support.TransactionSynchronizationManager
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.core.foundation.utils.ValidationUtil; import org.typroject.tyboot.core.foundation.utils.ValidationUtil;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
...@@ -90,6 +92,9 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -90,6 +92,9 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
EquipmentSpecificMapper equipmentSpecificMapper; EquipmentSpecificMapper equipmentSpecificMapper;
@Autowired @Autowired
protected EmqKeeper emqKeeper;
@Autowired
FireFightingSystemMapper FireFightingSystemMapper; FireFightingSystemMapper FireFightingSystemMapper;
static IFireFightingSystemService fireFightingSystemService; static IFireFightingSystemService fireFightingSystemService;
...@@ -210,6 +215,17 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -210,6 +215,17 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
return; return;
} }
log.info(String.format("收到mqtt消息:%s", message)); log.info(String.format("收到mqtt消息:%s", message));
// 发送emq消息转kafka
JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", topic);
jsonObject.put("data",message);
try {
emqKeeper.getMqttClient().publish("eqm.iot.created",jsonObject.toString().getBytes(),1,false);
} catch (MqttException e) {
e.printStackTrace();
}
if (!StringUtils.isEmpty(traceId)) { if (!StringUtils.isEmpty(traceId)) {
String finalTraceId = traceId; String finalTraceId = traceId;
List<IotDataVO> collect = iotDatalist.stream().map(x -> { List<IotDataVO> collect = iotDatalist.stream().map(x -> {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment