Commit c8e8de47 authored by maoying's avatar maoying

Merge branch 'develop_dl' of http://39.98.45.134:8090/moa/amos-boot-biz into develop_dl

parents 48c973b9 9c47e72a
...@@ -9,9 +9,12 @@ import com.yeejoin.equipmanage.common.utils.CoordinateUtil; ...@@ -9,9 +9,12 @@ import com.yeejoin.equipmanage.common.utils.CoordinateUtil;
import com.yeejoin.equipmanage.fegin.IotFeign; import com.yeejoin.equipmanage.fegin.IotFeign;
import com.yeejoin.equipmanage.service.ICarService; import com.yeejoin.equipmanage.service.ICarService;
import com.yeejoin.equipmanage.service.IWlCarMileageService; import com.yeejoin.equipmanage.service.IWlCarMileageService;
import com.yeejoin.equipmanage.service.impl.WlCarMileageServiceImpl;
import com.yeejoin.equipmanage.thread.ThreadCar; import com.yeejoin.equipmanage.thread.ThreadCar;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.component.emq.EmqxListener; import org.typroject.tyboot.component.emq.EmqxListener;
...@@ -38,7 +41,8 @@ public class CarIotNewListener extends EmqxListener { ...@@ -38,7 +41,8 @@ public class CarIotNewListener extends EmqxListener {
@Autowired @Autowired
private IotFeign iotFeign; private IotFeign iotFeign;
@Value("${mileage.clippingtime}")
private Long clipping_time;
private final String GUIDE_KEY = "813684495d9a3981dd2c7694916fe404"; private final String GUIDE_KEY = "813684495d9a3981dd2c7694916fe404";
private final String GUIDE_URL = "https://restapi.amap.com/v3/geocode/regeo?"; private final String GUIDE_URL = "https://restapi.amap.com/v3/geocode/regeo?";
...@@ -50,156 +54,68 @@ public class CarIotNewListener extends EmqxListener { ...@@ -50,156 +54,68 @@ public class CarIotNewListener extends EmqxListener {
// 用于根据topicId 也就是物联设备id 存储对应的线程来进行计时 // 用于根据topicId 也就是物联设备id 存储对应的线程来进行计时
private static HashMap<String, ThreadCar> deviceInfo = new HashMap(); private static HashMap<String, ThreadCar> deviceInfo = new HashMap();
private static HashMap<String, String> deviceLastInfo = new HashMap();
@Override @Override
public void processMessage(String topic, MqttMessage message) throws Exception { public void processMessage(String topic, MqttMessage message) throws Exception {
System.out.println(topic);
System.out.println(message);
String measurement = topic.split("/")[0];
String deviceName = topic.split("/")[1];
String iotCode = measurement + deviceName;
JSONObject jsonObject = JSONObject.parseObject(message.toString()); JSONObject jsonObject = JSONObject.parseObject(message.toString());
//如果map中已经存在该设备或者该设备有但是线程已经执行了 //如果map中已经存在该设备或者该设备有但是线程已经执行了
if ((!deviceInfo.containsKey(topic)) || (deviceInfo.containsKey(topic) && deviceInfo.get(topic) == null)) { if ((!deviceInfo.containsKey(topic)) || (deviceInfo.containsKey(topic) && deviceInfo.get(topic) == null)) {
ThreadCar threadCar = new ThreadCar(topic, jsonObject); ThreadCar threadCar = new ThreadCar(topic, jsonObject, this.iWlCarMileageService, this.iotFeign, this.iCarService, this.emqkeeper, clipping_time);
deviceInfo.put(topic, threadCar); deviceInfo.put(topic, threadCar);
threadCar.start(); threadCar.start();
} else {
Long startTime = new Date().getTime();
Long endTime= jsonObject.getLong("time");
if((startTime-endTime) <=600000){
deviceInfo.get(topic).interrupt();
ThreadCar threadCar = new ThreadCar(topic, jsonObject);
deviceInfo.put(topic, threadCar);
threadCar.start();
}
} }
if (jsonObject.containsKey("startUp") || jsonObject.containsKey("FireCar_Longitude")) { String coordinate = jsonObject.getDoubleValue("FireCar_Longitude") +String.valueOf(jsonObject.getDoubleValue("FireCar_Latitude"));
System.out.println(topic); if(!deviceLastInfo.containsKey(topic)){
System.out.println(message); deviceLastInfo.put(topic,coordinate);
String measurement = topic.split("/")[0]; }
String deviceName = topic.split("/")[1]; Long currentTime = new Date().getTime();
String iotCode = measurement + deviceName; Long endTime = jsonObject.getLong("time");
if (jsonObject.containsKey("startUp")) { // if ((startTime - endTime) <= 600000) {
if (jsonObject.getBooleanValue("startUp")) { if (((currentTime - endTime) <= clipping_time)&&(!deviceLastInfo.get(topic).equals(coordinate))) {
WlCarMileage wlCarMileage = new WlCarMileage(); try {
wlCarMileage.setIotCode(iotCode); deviceInfo.get(topic).interrupt();
wlCarMileage.setDate(new Date()); } catch (Exception e) {
// 获取开始坐标 }
double startLongitude = jsonObject.getDoubleValue("FireCar_Longitude"); ThreadCar threadCar = new ThreadCar(topic, jsonObject, this.iWlCarMileageService, this.iotFeign, this.iCarService, this.emqkeeper, clipping_time);
double startLatitude = jsonObject.getDoubleValue("FireCar_Latitude"); deviceInfo.put(topic, threadCar);
// String currentTime = "20"+jsonObject.getString("currentTime"); //更新车辆的最新坐标数据
wlCarMileage.setStartLongitude(startLongitude); deviceLastInfo.put(topic,coordinate);
wlCarMileage.setStartLatitude(startLatitude); threadCar.start();
// Date startTime = UTCToCST(); }
Date startTime = new Date(jsonObject.getLong("time")); if (iWlCarMileageService.getUncompleteMileagByIotCode(iotCode)) {
wlCarMileage.setStartTime(startTime); WlCarMileage wlCarMileage = new WlCarMileage();
wlCarMileage.setStartName(getAddress(startLongitude, startLatitude)); wlCarMileage.setIotCode(iotCode);
wlCarMileage.setStartSpeed(jsonObject.getIntValue("FireCar_Speed")); wlCarMileage.setDate(new Date());
try { // 获取开始坐标
iWlCarMileageService.save(wlCarMileage); double startLongitude = jsonObject.getDoubleValue("FireCar_Longitude");
} catch (Exception e) { double startLatitude = jsonObject.getDoubleValue("FireCar_Latitude");
e.printStackTrace(); // String currentTime = "20"+jsonObject.getString("currentTime");
iWlCarMileageService.save(wlCarMileage); wlCarMileage.setStartLongitude(startLongitude);
} wlCarMileage.setStartLatitude(startLatitude);
} else { // Date startTime = UTCToCST();
// 获取结束坐标 Date startTime = new Date(jsonObject.getLong("time"));
WlCarMileage last = iWlCarMileageService wlCarMileage.setStartTime(startTime);
.getOne(new LambdaQueryWrapper<WlCarMileage>().eq(WlCarMileage::getIotCode, iotCode) wlCarMileage.setStartName(getAddress(startLongitude, startLatitude));
.isNull(WlCarMileage::getEndLongitude).isNull(WlCarMileage::getEndLatitude) wlCarMileage.setStartSpeed(jsonObject.getIntValue("FireCar_Speed"));
.orderByDesc(WlCarMileage::getStartTime).last("limit 1")); try {
ResponseModel<List<Object>> result = iotFeign.getLiveData(measurement, deviceName, iWlCarMileageService.save(wlCarMileage);
last.getStartTime(), new Date(new Date().getTime() + 2000)); } catch (Exception e) {
List<Object> list = result.getResult(); e.printStackTrace();
if (list != null && list.size() > 0) { iWlCarMileageService.save(wlCarMileage);
// 获取最后一个有坐标的数据
JSONObject lastObj = null;
// 过滤空坐标
List<Object> filterList = new ArrayList<Object>();
for (int i = 0; i < list.size(); i++) {
JSONObject Obj = JSONObject.parseObject(JSONObject.toJSONString(list.get(i)));
if (Obj.get("FireCar_Longitude") != null && Obj.get("FireCar_Latitude") != null
&& Obj.getDoubleValue("FireCar_Longitude") != 0
&& Obj.getDoubleValue("FireCar_Latitude") != 0) {
filterList.add(list.get(i));
// 获取第一个不为空的坐标
if (lastObj == null) {
lastObj = Obj;
}
}
}
// JSONObject lastObj =
// JSONObject.parseObject(JSONObject.toJSONString(list.get(list.size() - 1)));
if (lastObj == null) {
lastObj = new JSONObject();
lastObj.put("FireCar_Longitude", 0.0);
lastObj.put("FireCar_Latitude", 0.0);
lastObj.put("time", 0);
lastObj.put("FireCar_Speed", 0);
}
double endLongitude = lastObj.getDoubleValue("FireCar_Longitude");
double endLatitude = lastObj.getDoubleValue("FireCar_Latitude");
// 230215180624
// Date endTime =UTCToCST(lastObj.getString("time"));
Date endTime = new Date(jsonObject.getLong("time"));
long takeTime = (endTime.getTime() / 1000 * 1000) - (last.getStartTime().getTime() / 1000 * 1000);
last.setEndLongitude(endLongitude);
last.setEndLatitude(endLatitude);
last.setEndTime(endTime);
last.setEndName(getAddress(endLongitude, endLatitude));
last.setEndSpeed(lastObj.getIntValue("FireCar_Speed"));
last.setTakeTime(takeTime);
double travel = 0.0;
// 获取里程
for (int i = 0; i < filterList.size() - 1; i++) {
JSONObject start = JSONObject.parseObject(JSONObject.toJSONString(filterList.get(i)));
JSONObject end = JSONObject.parseObject(JSONObject.toJSONString(filterList.get(i + 1)));
travel += CoordinateUtil.distance(start.getDoubleValue("FireCar_Latitude"),
start.getDoubleValue("FireCar_Longitude"), end.getDoubleValue("FireCar_Latitude"),
end.getDoubleValue("FireCar_Longitude"));
}
last.setTravel(new BigDecimal(travel / 1000).setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue());
try {
iWlCarMileageService.updateById(last);
} catch (Exception e) {
iWlCarMileageService.updateById(last);
}
}
}
} else if (jsonObject.containsKey("FireCar_Longitude")) {
if (jsonObject.containsKey("FireCar_Longitude") && jsonObject.containsKey("FireCar_Latitude")) {
// 获取开始坐标
double startLongitude = jsonObject.getDoubleValue("FireCar_Longitude");
double startLatitude = jsonObject.getDoubleValue("FireCar_Latitude");
int direction = jsonObject.getIntValue("direction");
// 地图推送消息
Car car = iCarService.getOne(new LambdaQueryWrapper<Car>().eq(Car::getIotCode, iotCode));
if (car != null && startLongitude != 0 && startLatitude != 0) {
JSONArray sendArr = new JSONArray();
JSONObject sendObj = new JSONObject();
sendObj.put("id", String.valueOf(car.getId()));
sendObj.put("direction", direction);
sendObj.put("longitude", String.valueOf(startLongitude));
sendObj.put("latitude", String.valueOf(startLatitude));
sendObj.put("carNum", car.getCarNum());
sendObj.put("bizOrgName", car.getBizOrgName());
sendArr.add(sendObj);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(sendArr.toJSONString().getBytes());
car.setLongitude(startLongitude);
car.setLatitude(startLatitude);
iCarService.updateById(car);
emqkeeper.getMqttClient().publish("car/location", mqttMessage);
}
}
} }
} }
} }
public String getAddress(double longitude, double lantitude) { public String getAddress(double longitude, double lantitude) {
StringBuilder api = new StringBuilder(GUIDE_URL); StringBuilder api = new StringBuilder(GUIDE_URL);
api.append("key=").append(GUIDE_KEY).append("&location=").append(longitude).append(",").append(lantitude) api.append("key=").append(GUIDE_KEY).append("&location=").append(longitude).append(",").append(lantitude).append("&radius=1000").append("&batch=false").append("&extensions=base").append("&roadlevel=0").append("&batch=false");
.append("&radius=1000").append("&batch=false").append("&extensions=base").append("&roadlevel=0")
.append("&batch=false");
StringBuilder res = new StringBuilder(); StringBuilder res = new StringBuilder();
BufferedReader in = null; BufferedReader in = null;
try { try {
...@@ -244,4 +160,5 @@ public class CarIotNewListener extends EmqxListener { ...@@ -244,4 +160,5 @@ public class CarIotNewListener extends EmqxListener {
calendar.set(Calendar.HOUR, calendar.get(Calendar.HOUR) + 8); calendar.set(Calendar.HOUR, calendar.get(Calendar.HOUR) + 8);
return calendar.getTime(); return calendar.getTime();
} }
} }
...@@ -30,6 +30,7 @@ public interface IWlCarMileageService extends IService<WlCarMileage> { ...@@ -30,6 +30,7 @@ public interface IWlCarMileageService extends IService<WlCarMileage> {
* 里程切分(0点若里程未结束,自动切分设置结束信息,并开始新里程) * 里程切分(0点若里程未结束,自动切分设置结束信息,并开始新里程)
*/ */
void mileageSegmentation(); void mileageSegmentation();
//根据iot编码查询是否有未结束里程
Boolean getUncompleteMileagByIotCode(String iotCode);
} }
...@@ -3,6 +3,7 @@ package com.yeejoin.equipmanage.service.impl; ...@@ -3,6 +3,7 @@ package com.yeejoin.equipmanage.service.impl;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
...@@ -384,6 +385,12 @@ public class WlCarMileageServiceImpl extends ServiceImpl<WlCarMileageMapper, WlC ...@@ -384,6 +385,12 @@ public class WlCarMileageServiceImpl extends ServiceImpl<WlCarMileageMapper, WlC
log.info("轨迹切分任务执行完成.............."); log.info("轨迹切分任务执行完成..............");
} }
@Override
public Boolean getUncompleteMileagByIotCode(String iotCode) {
Integer integer = this.count(new QueryWrapper<WlCarMileage>().select("1").lambda().eq(WlCarMileage::getIotCode,iotCode).isNull(WlCarMileage::getEndTime));
return integer <= 0 ;
}
private String getAddress(double longitude, double lantitude) { private String getAddress(double longitude, double lantitude) {
StringBuilder api = new StringBuilder(GUIDE_ADDRESS_URL); StringBuilder api = new StringBuilder(GUIDE_ADDRESS_URL);
......
package com.yeejoin.equipmanage.thread; package com.yeejoin.equipmanage.thread;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.yeejoin.equipmanage.common.entity.Car;
import com.yeejoin.equipmanage.common.entity.WlCarMileage; import com.yeejoin.equipmanage.common.entity.WlCarMileage;
import com.yeejoin.equipmanage.common.utils.CoordinateUtil; import com.yeejoin.equipmanage.common.utils.CoordinateUtil;
import com.yeejoin.equipmanage.fegin.IotFeign; import com.yeejoin.equipmanage.fegin.IotFeign;
import com.yeejoin.equipmanage.service.ICarService;
import com.yeejoin.equipmanage.service.IWlCarMileageService; import com.yeejoin.equipmanage.service.IWlCarMileageService;
import com.yeejoin.equipmanage.utils.CarUtils; import com.yeejoin.equipmanage.utils.CarUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.core.restful.utils.ResponseModel; import org.typroject.tyboot.core.restful.utils.ResponseModel;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
public class ThreadCar extends Thread { public class ThreadCar extends Thread {
@Autowired
private IWlCarMileageService iWlCarMileageService; private IWlCarMileageService iWlCarMileageService;
@Autowired
private IotFeign iotFeign; private IotFeign iotFeign;
private String topic; private String topic;
private Long clippingTime;
ICarService iCarService;
private EmqKeeper emqkeeper;
private JSONObject jsonObject; private JSONObject jsonObject;
public ThreadCar(String topic, JSONObject jsonObject){
public ThreadCar(String topic, JSONObject jsonObject, IWlCarMileageService iWlCarMileageService, IotFeign iotFeign, ICarService iCarService, EmqKeeper emqkeeper, Long clippingTime) {
this.topic = topic; this.topic = topic;
this.jsonObject = jsonObject; this.jsonObject = jsonObject;
this.iWlCarMileageService = iWlCarMileageService;
this.iotFeign = iotFeign;
this.iCarService = iCarService;
this.emqkeeper = emqkeeper;
this.clippingTime = clippingTime;
} }
@Override @Override
public void run() { public void run() {
//toDo //toDo
// 获取最后一个有坐标的数据 // 获取最后一个有坐标的数据
JSONObject lastObj = null; JSONObject lastObj = null;
WlCarMileage last= null; WlCarMileage last = null;
try { try {
Thread.sleep(600000); TimeUnit.MILLISECONDS.sleep(Long.valueOf((long) (clippingTime)));
//业务处理 //业务处理
//如果十分钟没有坐标,则需要设置结束标记 //如果十分钟没有坐标,则需要设置结束标记
// 获取结束坐标 // 获取结束坐标
String measurement = topic.split("/")[0]; String measurement = topic.split("/")[0];
String deviceName = topic.split("/")[1]; String deviceName = topic.split("/")[1];
String iotCode = measurement + deviceName; String iotCode = measurement + deviceName;
last = iWlCarMileageService last = iWlCarMileageService
.getOne(new LambdaQueryWrapper<WlCarMileage>().eq(WlCarMileage::getIotCode, iotCode) .getOne(new LambdaQueryWrapper<WlCarMileage>().eq(WlCarMileage::getIotCode, iotCode)
.isNull(WlCarMileage::getEndLongitude).isNull(WlCarMileage::getEndLatitude) .isNull(WlCarMileage::getEndLongitude).isNull(WlCarMileage::getEndLatitude)
.orderByDesc(WlCarMileage::getStartTime).last("limit 1")); .orderByDesc(WlCarMileage::getStartTime).last("limit 1"));
...@@ -79,7 +94,7 @@ public class ThreadCar extends Thread { ...@@ -79,7 +94,7 @@ public class ThreadCar extends Thread {
// 230215180624 // 230215180624
// Date endTime =UTCToCST(lastObj.getString("time")); // Date endTime =UTCToCST(lastObj.getString("time"));
Date endTime = new Date(jsonObject.getLong("time")); Date endTime = new Date();
long takeTime = (endTime.getTime() / 1000 * 1000) - (last.getStartTime().getTime() / 1000 * 1000); long takeTime = (endTime.getTime() / 1000 * 1000) - (last.getStartTime().getTime() / 1000 * 1000);
last.setEndLongitude(endLongitude); last.setEndLongitude(endLongitude);
last.setEndLatitude(endLatitude); last.setEndLatitude(endLatitude);
...@@ -98,10 +113,44 @@ public class ThreadCar extends Thread { ...@@ -98,10 +113,44 @@ public class ThreadCar extends Thread {
} }
last.setTravel(new BigDecimal(travel / 1000).setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue()); last.setTravel(new BigDecimal(travel / 1000).setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue());
iWlCarMileageService.updateById(last); iWlCarMileageService.updateById(last);
this.pushCarInfoToMap(jsonObject, topic);
this.interrupt(); this.interrupt();
} }
} catch (Exception exception) { } catch (Exception exception) {
iWlCarMileageService.updateById(last);
}
}
public void pushCarInfoToMap(JSONObject jsonObject, String iotCode) {
if (jsonObject.containsKey("FireCar_Longitude") && jsonObject.containsKey("FireCar_Latitude")) {
// 获取开始坐标
double startLongitude = jsonObject.getDoubleValue("FireCar_Longitude");
double startLatitude = jsonObject.getDoubleValue("FireCar_Latitude");
int direction = jsonObject.getIntValue("direction");
// 地图推送消息
Car car = iCarService.getOne(new LambdaQueryWrapper<Car>().eq(Car::getIotCode, iotCode));
if (car != null && startLongitude != 0 && startLatitude != 0) {
JSONArray sendArr = new JSONArray();
JSONObject sendObj = new JSONObject();
sendObj.put("id", String.valueOf(car.getId()));
sendObj.put("direction", direction);
sendObj.put("longitude", String.valueOf(startLongitude));
sendObj.put("latitude", String.valueOf(startLatitude));
sendObj.put("carNum", car.getCarNum());
sendObj.put("bizOrgName", car.getBizOrgName());
sendArr.add(sendObj);
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(sendArr.toJSONString().getBytes());
car.setLongitude(startLongitude);
car.setLatitude(startLatitude);
iCarService.updateById(car);
try {
emqkeeper.getMqttClient().publish("car/location", mqttMessage);
} catch (MqttException e) {
throw new RuntimeException(e);
}
}
} }
} }
} }
......
package com.yeejoin.equipmanage.utils; package com.yeejoin.equipmanage.utils;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.yeejoin.equipmanage.common.entity.Car;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment