Commit 57e151fd authored by wujiang's avatar wujiang

提交

parent 7831941b
...@@ -22,9 +22,9 @@ public class PermissionInterceptorContext { ...@@ -22,9 +22,9 @@ public class PermissionInterceptorContext {
public static void clean() { public static void clean() {
if (requestContext != null) { if (requestContext != null) {
logger.info("PermissionInterceptorContext clean RestThreadLocal......Begin"); // logger.info("PermissionInterceptorContext clean RestThreadLocal......Begin");
requestContext.remove(); requestContext.remove();
logger.info("PermissionInterceptorContext clean RestThreadLocal......Done"); // logger.info("PermissionInterceptorContext clean RestThreadLocal......Done");
} }
} }
......
...@@ -82,7 +82,7 @@ public class CarIotNewListener extends EmqxListener { ...@@ -82,7 +82,7 @@ public class CarIotNewListener extends EmqxListener {
public void processMessage(String topic, MqttMessage message) throws Exception { public void processMessage(String topic, MqttMessage message) throws Exception {
//logger.info("----收到物联消息::topic---------------" + topic); //logger.info("----收到物联消息::topic---------------" + topic);
logger.info("----收到物联消息::message---------------" + message); //logger.info("----收到物联消息::message---------------" + message);
pooledExecutor.submit(() -> { pooledExecutor.submit(() -> {
jxiopCarIotListerServiceImpl.processMessage(topic,message); jxiopCarIotListerServiceImpl.processMessage(topic,message);
}); });
......
...@@ -61,7 +61,7 @@ public class JxiopCarIotListerServiceImpl { ...@@ -61,7 +61,7 @@ public class JxiopCarIotListerServiceImpl {
@Autowired @Autowired
private RedisTemplate redisTemplate; private RedisTemplate redisTemplate;
@Autowired @Autowired
private static Map<String,Object> cache = new HashMap<>(); private static ConcurrentHashMap<String,Object> cache = new ConcurrentHashMap<>();
// @Async("equipAsyncExecutor") // @Async("equipAsyncExecutor")
...@@ -70,13 +70,17 @@ public class JxiopCarIotListerServiceImpl { ...@@ -70,13 +70,17 @@ public class JxiopCarIotListerServiceImpl {
String deviceName = topic.split("/")[1]; String deviceName = topic.split("/")[1];
//根据topic 组装iotCode //根据topic 组装iotCode
String iotCode = measurement + deviceName; String iotCode = measurement + deviceName;
//处理并发
if(cache.get(iotCode)!=null)
{
return;
}
cache.put(iotCode,iotCode);
JSONObject jsonObject = JSONObject.parseObject(message.toString()); JSONObject jsonObject = JSONObject.parseObject(message.toString());
//判断是否有效坐标 //判断是否有效坐标
if (!ObjectUtils.isEmpty(jsonObject.get("FireCar_Longitude")) && !ObjectUtils.isEmpty(jsonObject.get("FireCar_Latitude")) if (!ObjectUtils.isEmpty(jsonObject.get("FireCar_Longitude")) && !ObjectUtils.isEmpty(jsonObject.get("FireCar_Latitude"))) {
&&cache.get(iotCode)==null) {
//判断是否存在未结束进程,如果不存在,则进入判断插入开始节点 //判断是否存在未结束进程,如果不存在,则进入判断插入开始节点
if (iWlCarMileageService.getUncompleteMileagByIotCode(iotCode)&&cache.get(iotCode)==null) { if (iWlCarMileageService.getUncompleteMileagByIotCode(iotCode)) {
cache.put(iotCode,iotCode);
WlCarMileage wlCarMileage = new WlCarMileage(); WlCarMileage wlCarMileage = new WlCarMileage();
wlCarMileage.setIotCode(iotCode); wlCarMileage.setIotCode(iotCode);
wlCarMileage.setDate(new Date()); wlCarMileage.setDate(new Date());
...@@ -99,7 +103,6 @@ public class JxiopCarIotListerServiceImpl { ...@@ -99,7 +103,6 @@ public class JxiopCarIotListerServiceImpl {
e.printStackTrace(); e.printStackTrace();
iWlCarMileageService.save(wlCarMileage); iWlCarMileageService.save(wlCarMileage);
} }
cache.remove(iotCode);
} }
this.updateCarLocation(jsonObject, iotCode); this.updateCarLocation(jsonObject, iotCode);
String coordinate = jsonObject.getString("FireCar_Longitude") + "," + jsonObject.getString("FireCar_Latitude"); String coordinate = jsonObject.getString("FireCar_Longitude") + "," + jsonObject.getString("FireCar_Latitude");
...@@ -113,7 +116,7 @@ public class JxiopCarIotListerServiceImpl { ...@@ -113,7 +116,7 @@ public class JxiopCarIotListerServiceImpl {
} }
} }
} }
cache.remove(iotCode);
} }
public String getAddress(double longitude, double lantitude) { public String getAddress(double longitude, double lantitude) {
......
...@@ -364,7 +364,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -364,7 +364,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
public void handlerMqttIncrementMessage(String topic, String message) { public void handlerMqttIncrementMessage(String topic, String message) {
TopicEntityVo topicEntity = new TopicEntityVo(); TopicEntityVo topicEntity = new TopicEntityVo();
topicEntity.setTopic(topic); topicEntity.setTopic(topic);
topicEntity.setMessage(message); topicEntity.setMessage(message);
...@@ -410,15 +409,15 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -410,15 +409,15 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
log.info(String.format("收到mqtt消息:%s", message)); log.info(String.format("收到mqtt消息:%s", message));
// 发送emq消息转kafka // 发送emq消息转kafka
JSONObject jsonObject = new JSONObject(); // JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", topic); // jsonObject.put("topic", topic);
jsonObject.put("data", message); // jsonObject.put("data", message);
try { // try {
emqKeeper.getMqttClient().publish("emq.iot.created", jsonObject.toString().getBytes(), 1, false); // emqKeeper.getMqttClient().publish("emq.iot.created", jsonObject.toString().getBytes(), 1, false);
} catch (MqttException e) { // } catch (MqttException e) {
log.info(String.format("发送eqm转kafka消息失败:%s", e.getMessage())); // log.info(String.format("发送eqm转kafka消息失败:%s", e.getMessage()));
} // }
if (!StringUtils.isEmpty(traceId)) { if (!StringUtils.isEmpty(traceId)) {
String finalTraceId = traceId; String finalTraceId = traceId;
......
...@@ -9,7 +9,7 @@ spring.datasource.driver-class-name=com.kingbase8.Driver ...@@ -9,7 +9,7 @@ spring.datasource.driver-class-name=com.kingbase8.Driver
#mybatis mapper file #mybatis mapper file
mybatis.mapper-locations=classpath:mapper/*.xml mybatis.mapper-locations=classpath:mapper/*.xml
#mybatis-plus #mybatis-plus
mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.nologging.NoLoggingImpl
# mybatis entity package # mybatis entity package
mybatis.type-aliases-package=com.yeejoin.equipmanage.common.entity mybatis.type-aliases-package=com.yeejoin.equipmanage.common.entity
spring.jackson.time-zone=GMT+8 spring.jackson.time-zone=GMT+8
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment