Commit 83e4a8c1 authored by caotao's avatar caotao

车辆里程相关问题处理

parent 3e872987
......@@ -29,6 +29,7 @@ import java.net.URL;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class CarIotNewListener extends EmqxListener {
......@@ -54,8 +55,8 @@ public class CarIotNewListener extends EmqxListener {
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 用于根据topicId 也就是物联设备id 存储对应的线程来进行计时
private static final HashMap<String, ThreadCar> deviceInfo = new HashMap();
private static final HashMap<String, String> deviceLastInfo = new HashMap();
public static final ConcurrentHashMap<String, ThreadCar> deviceInfo = new ConcurrentHashMap();
public static final ConcurrentHashMap<String, String> deviceLastInfo = new ConcurrentHashMap();
@Override
public void processMessage(String topic, MqttMessage message) throws Exception {
......@@ -64,13 +65,16 @@ public class CarIotNewListener extends EmqxListener {
logger.info("----收到物联消息::message---------------" + message);
String measurement = topic.split("/")[0];
String deviceName = topic.split("/")[1];
//根据topic 组装iotCode
String iotCode = measurement + deviceName;
//对于收到的消息进行数据转换
JSONObject jsonObject = JSONObject.parseObject(message.toString());
//通过消息存电量到扩展字段2
this.updateEquipBattery(jsonObject, iotCode);
logger.info("当前设备信息info" + JSON.toJSONString(deviceInfo));
//判断是否有效数据
if (!ObjectUtils.isEmpty(jsonObject.get("FireCar_Longitude")) || !ObjectUtils.isEmpty(jsonObject.get("FireCar_Latitude"))) {
//判断是否有效坐标
if (!ObjectUtils.isEmpty(jsonObject.get("FireCar_Longitude")) && !ObjectUtils.isEmpty(jsonObject.get("FireCar_Latitude"))) {
//判断是否存在未结束进程,如果不存在,则进入判断插入开始节点
if (iWlCarMileageService.getUncompleteMileagByIotCode(iotCode)) {
WlCarMileage wlCarMileage = new WlCarMileage();
wlCarMileage.setIotCode(iotCode);
......@@ -98,22 +102,23 @@ public class CarIotNewListener extends EmqxListener {
this.updateCarLocation(jsonObject, iotCode);
//如果map中已经存在该设备或者该设备有但是线程已经执行了
//存储上报上来的经纬度信息
String coordinate = jsonObject.getDoubleValue("FireCar_Longitude") + String.valueOf(jsonObject.getDoubleValue("FireCar_Latitude"));
if ((!deviceInfo.containsKey(iotCode))) {
String coordinate = jsonObject.getString("FireCar_Longitude")+ jsonObject.getString("FireCar_Latitude");
logger.info("----收到经纬度消息::coordinate---------------" + coordinate);
//判断缓存中是否已经存在该设备对应的倒计时线程
if (!deviceInfo.containsKey(iotCode)) {
logger.info("topic---------------------" + topic + "开启计时线程");
ThreadCar threadCar = new ThreadCar(topic, jsonObject, this.iWlCarMileageService, this.iotFeign, this.iCarService, clipping_time);
deviceInfo.put(iotCode, threadCar);
threadCar.start();
}
//判断缓存中是否已经存在该设备对应的经纬度信息
if (!deviceLastInfo.containsKey(iotCode)) {
deviceLastInfo.put(iotCode, coordinate);
}
//获取打不过全部系统时间
Long currentTime = System.currentTimeMillis();
//获取上报的时间
Long endTime = jsonObject.getLong("time");
//如果当前时间减去上报时间大小于配置的时间并且之前没包含上报的数据
if (((currentTime - endTime) < clipping_time) && (!deviceLastInfo.get(iotCode).equals(coordinate))) {
logger.info("----收到上次经纬度消息::coordinate---------------" + deviceLastInfo.get(iotCode));
//判断本次上报的经纬度信息是否与当前上报的经纬度信息相同
if ((!deviceLastInfo.get(iotCode).equals(coordinate))) {
try {
//销毁线程后移除
deviceInfo.get(iotCode).interrupt();
......@@ -121,7 +126,7 @@ public class CarIotNewListener extends EmqxListener {
} catch (Exception e) {
deviceInfo.remove(iotCode);
}
logger.info("topic---------------------" + topic + "开启计时线程");
logger.info("topic---------------------" + topic + "销毁后,开启计时线程");
ThreadCar threadCar = new ThreadCar(topic, jsonObject, this.iWlCarMileageService, this.iotFeign, this.iCarService, clipping_time);
deviceInfo.put(iotCode, threadCar);
//更新车辆的最新坐标数据
......@@ -231,7 +236,7 @@ public class CarIotNewListener extends EmqxListener {
if (car != null && power != 0) {
car.setExtra2(power.toString());
iCarService.updateById(car);
logger.info("-----------更新车辆设备电池电量成功--------");
logger.info(iotCode+"-----------更新车辆设备电池电量成功--------");
}
}
}
......@@ -12,6 +12,7 @@ import com.yeejoin.equipmanage.common.utils.CoordinateUtil;
import com.yeejoin.equipmanage.common.utils.HttpUtil;
import com.yeejoin.equipmanage.controller.Coordinate;
import com.yeejoin.equipmanage.fegin.IotFeign;
import com.yeejoin.equipmanage.listener.CarIotNewListener;
import com.yeejoin.equipmanage.mapper.WlCarMileageMapper;
import com.yeejoin.equipmanage.service.ICarService;
import com.yeejoin.equipmanage.service.IWlCarMileageService;
......@@ -304,11 +305,29 @@ public class WlCarMileageServiceImpl extends ServiceImpl<WlCarMileageMapper, WlC
log.info("轨迹切分定时任务数据过滤时间.............{}", nowDate);
List<WlCarMileage> list = this.baseMapper.list(nowDate);
log.info("需要切分数据, {}", list);
//销毁所有线程和所有上次坐标
//销毁所有线程
log.info("销毁所有有效线程开始");
CarIotNewListener.deviceInfo.keySet().forEach(s->{
try {
CarIotNewListener.deviceInfo.get(s).interrupt();
}catch (Exception e){
log.info("销毁有效线程失败"+e.getMessage());
}
});
CarIotNewListener.deviceInfo.clear();
log.info("销毁所有有效线程成功");
//销毁所有的坐标信息
log.info("销毁所有坐标信息开始");
CarIotNewListener.deviceLastInfo.clear();
log.info("销毁所有坐标信息成功");
log.info("------------------开始切分里程-------------------------------");
list.forEach(item -> {
Calendar calendar = Calendar.getInstance();
calendar.setTime(item.getDate());
calendar.add(Calendar.DATE, 1);
Date date = calendar.getTime();
//iotcode有效
if (!ObjectUtils.isEmpty(item.getIotCode()) && item.getIotCode().length() > 8) {
// 查询车辆上报信息
ResponseModel<List<Object>> result = iotFeign.getLiveData(item.getIotCode().substring(0, 8), item.getIotCode().substring(8),
......@@ -356,6 +375,7 @@ public class WlCarMileageServiceImpl extends ServiceImpl<WlCarMileageMapper, WlC
String address = getAddress(startLongitude, startLatitude);
// 里程耗时
long takeTime = (date.getTime()) - (item.getStartTime().getTime());
// 修改0点未结束里程记录
item.setEndSpeed(v.intValue());
item.setEndTime(date);
......@@ -366,33 +386,33 @@ public class WlCarMileageServiceImpl extends ServiceImpl<WlCarMileageMapper, WlC
item.setTakeTime(takeTime);
this.getBaseMapper().updateById(item);
log.info("-----------结束里程成功:::"+JSONObject.toJSONString(item)+"-----------------");
// 从0点开启新里程
item.setStartName(address);
item.setDate(date);
item.setId(null);
item.setEndSpeed(null);
item.setEndTime(null);
item.setEndLatitude(null);
item.setEndLongitude(null);
item.setEndName(null);
item.setTravel(null);
item.setTakeTime(null);
item.setStartSpeed(v.intValue());
item.setStartTime(item.getDate());
item.setStartLongitude(startLongitude);
item.setStartLatitude(startLatitude);
this.baseMapper.insert(item);
HashMap<String,String> messageMap = new HashMap<>();
messageMap.put("FireCar_Latitude", String.valueOf(startLatitude));
messageMap.put("FireCar_Longitude", String.valueOf(startLongitude));
messageMap.put("FireCar_Speed",String.valueOf(v.intValue()));
messageMap.put("time",String.valueOf(item.getDate().getTime()));
messageMap.put("name","轨迹切分消息!!!");
log.info("-----------新增开始里程成功:::"+JSONObject.toJSONString(item)+"-----------------");
try {
emqKeeper.getMqttClient().publish(item.getIotCode().substring(0, 8)+"/"+item.getIotCode().substring(8)+"/property",JSON.toJSON(messageMap).toString().getBytes("UTF-8"),1,false);
} catch (Exception e) {
}
// // 从0点开启新里程
// item.setStartName(address);
// item.setDate(date);
// item.setId(null);
// item.setEndSpeed(null);
// item.setEndTime(null);
// item.setEndLatitude(null);
// item.setEndLongitude(null);
// item.setEndName(null);
// item.setTravel(null);
// item.setTakeTime(null);
// item.setStartSpeed(v.intValue());
// item.setStartTime(item.getDate());
// item.setStartLongitude(startLongitude);
// item.setStartLatitude(startLatitude);
// this.baseMapper.insert(item);
// HashMap<String,String> messageMap = new HashMap<>();
// messageMap.put("FireCar_Latitude", String.valueOf(startLatitude));
// messageMap.put("FireCar_Longitude", String.valueOf(startLongitude));
// messageMap.put("FireCar_Speed",String.valueOf(v.intValue()));
// messageMap.put("time",String.valueOf(item.getDate().getTime()));
// messageMap.put("name","轨迹切分消息!!!");
// log.info("-----------新增开始里程成功:::"+JSONObject.toJSONString(item)+"-----------------");
// try {
// emqKeeper.getMqttClient().publish(item.getIotCode().substring(0, 8)+"/"+item.getIotCode().substring(8)+"/property",JSON.toJSON(messageMap).toString().getBytes("UTF-8"),1,false);
// } catch (Exception e) {
// }
//根据iotcode获取车辆并且同步经纬度到车辆
Car car = iCarService.getOne(new QueryWrapper<Car>().eq("iot_code", item.getIotCode()));
car.setLongitude(startLongitude);
......@@ -402,11 +422,11 @@ public class WlCarMileageServiceImpl extends ServiceImpl<WlCarMileageMapper, WlC
}
});
log.info("轨迹切分任务执行完成..............");
log.info("-------------------对于切割完成的数据进行倒计时操作----------------------------------");
ThreadCarMileageTreatment threadCarMileageTreatment = new ThreadCarMileageTreatment(this,iCarService , iotFeign);
log.info("-------------------对于切割完成的数据进行倒计时开始----------------------------------");
threadCarMileageTreatment.start();
log.info("-------------------对于切割完成的数据进行倒计时结束----------------------------------");
// log.info("-------------------对于切割完成的数据进行倒计时操作----------------------------------");
// ThreadCarMileageTreatment threadCarMileageTreatment = new ThreadCarMileageTreatment(this,iCarService , iotFeign);
// log.info("-------------------对于切割完成的数据进行倒计时开始----------------------------------");
// threadCarMileageTreatment.start();
// log.info("-------------------对于切割完成的数据进行倒计时结束----------------------------------");
}
@Override
......
......@@ -8,6 +8,7 @@ import com.yeejoin.equipmanage.common.entity.Car;
import com.yeejoin.equipmanage.common.entity.WlCarMileage;
import com.yeejoin.equipmanage.common.utils.CoordinateUtil;
import com.yeejoin.equipmanage.fegin.IotFeign;
import com.yeejoin.equipmanage.listener.CarIotNewListener;
import com.yeejoin.equipmanage.service.ICarService;
import com.yeejoin.equipmanage.service.IWlCarMileageService;
import com.yeejoin.equipmanage.utils.CarUtils;
......@@ -53,6 +54,9 @@ ThreadCar extends Thread {
// 获取最后一个有坐标的数据
JSONObject lastObj = null;
WlCarMileage last = null;
String measurement = topic.split("/")[0];
String deviceName = topic.split("/")[1];
String iotCode = measurement + deviceName;
try {
Long startTime = System.currentTimeMillis();
logger.info("=============================================" + topic + "结束坐标开始计时=======================================");
......@@ -60,9 +64,6 @@ ThreadCar extends Thread {
//业务处理
//如果十分钟没有坐标,则需要设置结束标记
// 获取结束坐标
String measurement = topic.split("/")[0];
String deviceName = topic.split("/")[1];
String iotCode = measurement + deviceName;
last = iWlCarMileageService
.getOne(new LambdaQueryWrapper<WlCarMileage>().eq(WlCarMileage::getIotCode, iotCode)
.isNull(WlCarMileage::getEndLongitude).isNull(WlCarMileage::getEndLatitude)
......@@ -136,6 +137,8 @@ ThreadCar extends Thread {
logger.info("---正常时获取到的获取last信息::" + JSONObject.toJSONString(last));
logger.info("--------------" + topic + "结束坐标成功::花费时间====" + String.valueOf((lastTime - startTime) / 60000) + "分钟-------------------------");
logger.info("============================================================更新结束坐标成功==============================================:" + topic);
//从缓存信息中移除运行完成的线程与坐标信息
CarIotNewListener.deviceInfo.remove(iotCode);
}
} catch (Exception exception) {
if (last != null) {
......@@ -144,7 +147,6 @@ ThreadCar extends Thread {
}
} finally {
logger.info("销毁车辆倒计时线程::topic_" + topic);
this.interrupt();
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment