Commit 8fb0d1e6 authored by tangwei's avatar tangwei

解决冲突

parents b1598822 fd0085fd
......@@ -11,7 +11,7 @@ public class RedisKey {
/**根据动态表单code获取动态表单列表*/
public static final String FORM_CODE = "form_code_";
/**根据字典code获取数据字典列表*/
public static final String DATA_DICTIONARY_CODE= "data_dictionary_code_";
public static final String DATA_DICTIONARY_CODE= "data_dictionary_code_";
/**根据字典code获取数据字典列表*/
public static final String DATA_DICTIONARY_CODE_XIN= "data_dictionary_code_xin_";
/**根据id获取消防人员基本信息*/
......@@ -42,7 +42,12 @@ public class RedisKey {
/** 企业用户注册前缀 */
public static final String FLC_USER_TEL = "flc_tel_";
/**
* 装备指标Key值
*/
public static final String EQUIP_INDEX_ADDRESS = "equip_index_address";
/** 驼峰转下划线(简单写法,效率低于 ) */
public static String humpToLine(String str) {
return str.replaceAll("[A-Z]", "_$0").toLowerCase();
......@@ -69,7 +74,7 @@ public class RedisKey {
public static String buildPatternKey(String token) {
return REGION_REDIS_PREFIX + "*" + "_" + token;
}
/**
* 判断str1中包含str2的个数
* @param str1
......
......@@ -69,4 +69,13 @@ public class EquipmentIndexVO {
@ApiModelProperty(value = "指标枚举")
private String valueEnum;
@ApiModelProperty(value = "信号的索引键key,用于唯一索引信号")
private String indexAddress;
@ApiModelProperty(value = "测点类型,analog/state")
private String dataType;
@ApiModelProperty(value = "网关标识")
private String gatewayId;
}
......@@ -56,6 +56,12 @@
<artifactId>amos-component-security</artifactId>
<version>1.7.13-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.yeejoin</groupId>
<artifactId>amos-component-influxdb</artifactId>
<version>1.8.5-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
package com.yeejoin.equipmanage.config;
import com.yeejoin.amos.boot.biz.common.utils.RedisKey;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.equipmanage.common.entity.vo.EquipmentIndexVO;
import com.yeejoin.equipmanage.mapper.EquipmentSpecificIndexMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* @author LiuLin
* @date 2023/6/15
* @apiNote
*/
@Component
@Slf4j
public class EquipmentIndexCacheRunner implements CommandLineRunner {
@Resource
private EquipmentSpecificIndexMapper equipmentSpecificIndexMapper;
@Resource
private RedisUtils redisUtils;
@Override
public void run(String... args) throws Exception {
log.info(">>服务启动执行,执行预加载数据等操作");
redisUtils.del(RedisKey.EQUIP_INDEX_ADDRESS);
List<EquipmentIndexVO> equipSpecificIndexList = equipmentSpecificIndexMapper.getEquipSpecificIndexList(null);
Map<String, Object> equipmentIndexVOMap = equipSpecificIndexList.stream()
.filter(v -> v.getGatewayId() != null)
.collect(Collectors.toMap(vo -> vo.getIndexAddress() + "_" + vo.getGatewayId(), Function.identity()));
redisUtils.hmset(RedisKey.EQUIP_INDEX_ADDRESS, equipmentIndexVOMap);
}
}
......@@ -64,10 +64,10 @@ public class EquipmentIotMqttReceiveConfig {
@Value("${patrol.center.risk.topic}")
private String riskMsgCenterPatrolTopic;
private EquipmentSpecificMapper equipmentSpecificMapper;
private EquipmentSpecificMapper equipmentSpecificMapper;
private MqttEventReceiveService mqttEventReceiveService;
private MqttEventReceiveService mqttEventReceiveService;
private ISyncDataService iSyncDataService;
......@@ -76,10 +76,12 @@ public class EquipmentIotMqttReceiveConfig {
public void setEquipmentSpecificMapper(EquipmentSpecificMapper equipmentSpecificMapper) {
this.equipmentSpecificMapper = equipmentSpecificMapper;
}
@Autowired
public void setMqttEventReceiveService(MqttEventReceiveService mqttEventReceiveService) {
this.mqttEventReceiveService = mqttEventReceiveService;
}
@Autowired
public void setiSyncDataService(ISyncDataService iSyncDataService) {
this.iSyncDataService = iSyncDataService;
......@@ -156,10 +158,10 @@ public class EquipmentIotMqttReceiveConfig {
mqttReceiveService.handlerMqttIncrementMessage(topic, msg);
} else if (dataType.equals("event") && StringUtil.isNotEmpty(msg)) {
mqttEventReceiveService.handlerMqttIncrementMessage(topic, msg);
}else if (dataType.equals("transmit") && StringUtil.isNotEmpty(msg)){
mqttReceiveService.handlerMqttRomaMessage(topic,msg);
}else if (dataType.equals("perspective") && StringUtil.isNotEmpty(msg)){
mqttReceiveService.handlerMqttIotMessage(topic,msg);
} else if (dataType.equals("transmit") && StringUtil.isNotEmpty(msg)) {
mqttReceiveService.handlerMqttRomaMessage(topic, msg);
} else if (dataType.equals("perspective") && StringUtil.isNotEmpty(msg)) {
mqttReceiveService.handlerIotMessage(topic, msg);
} else if (dataType.equals("trigger") && StringUtil.isNotEmpty(msg)) {
mqttReceiveService.handleDataToRiskModel(topic, msg);
}
......
......@@ -224,7 +224,7 @@ public class CarController extends AbstractBaseController {
}
});
}
if (ObjectUtils.isEmpty(car.getName())) {
if (!ObjectUtils.isEmpty(car.getEquipmentId())) {
Equipment equipment = iEquipmentService.getById(car.getEquipmentId());
car.setName(equipment != null ? equipment.getName() : null);
}
......@@ -1444,8 +1444,8 @@ public class CarController extends AbstractBaseController {
@TycloudOperation(ApiLevel = UserType.AGENCY,needAuth = false)
@RequestMapping(value = "/getCarAreaInfo", method = RequestMethod.GET)
@ApiOperation(httpMethod = "GET", value = "车辆统计-获取片区车辆归属信息接口", notes = "车辆统计-获取片区车辆归属信息接口")
public Page<CarAreaInfoDto> getCarAreaInfo(@RequestParam Long id) {
return iCarService.getCarAreaInfo(id);
public Page<CarAreaInfoDto> getCarAreaInfo(@RequestParam String areaName) {
return iCarService.getCarAreaInfo(areaName);
}
@TycloudOperation(ApiLevel = UserType.AGENCY,needAuth = false)
@RequestMapping(value = "/getCarMileageInfoByMoth", method = RequestMethod.GET)
......@@ -1455,11 +1455,11 @@ public class CarController extends AbstractBaseController {
}
@TycloudOperation(ApiLevel = UserType.AGENCY,needAuth = false)
@RequestMapping(value = "/exportCarMileageInfoByMoth", method = RequestMethod.GET)
@ApiOperation(httpMethod = "GET", value = "车辆统计-按照月份统计车辆里程", notes = "车辆统计-按照月份统计车辆里程")
@ApiOperation(httpMethod = "GET", value = "车辆统计-按照月份统计导出车辆里程", notes = "车辆统计-按照月份统计导出车辆里程")
public void exportCarMileageInfoByMoth(@RequestParam String date, HttpServletResponse response) {
List<CarExportDto> list = this.iCarService.exportCarMileageInfoByMoth(date);
String name = "车辆里程月度统计表-"+date;
FileHelper.exportExcel(list,name,name,CarExportDto.class,UUID.randomUUID().toString()+".xls",response);
List<CarExportDto> list = this.iCarService.exportCarMileageInfoByMoth(date);
String name = "车辆里程月度统计表-"+date;
FileHelper.exportExcel(list,name,name,CarExportDto.class,UUID.randomUUID().toString()+".xls",response);
}
@TycloudOperation(ApiLevel = UserType.AGENCY,needAuth = false)
@RequestMapping(value = "/getCarMileageInfoByMothOFDay", method = RequestMethod.GET)
......
......@@ -28,6 +28,7 @@ import com.yeejoin.equipmanage.service.impl.FireFightingSystemServiceImpl;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.transaction.annotation.Transactional;
......@@ -421,19 +422,19 @@ public class EquipmentSpecificController extends AbstractBaseController {
EquipmentSpecific spec = equipmentSpecificSerivce.getBaseMapper().selectById(id);
int oldStatus = Integer.valueOf(spec.getEquipStatus());
int newStatus = Integer.valueOf(status);
if (oldStatus >= newStatus){
//保存 wl_equip_qrcode_record 二维码记录
EquipQrcodeRecord equipQrcodeRecord = new EquipQrcodeRecord();
equipQrcodeRecord.setEquipid(Long.valueOf(id));
equipQrcodeRecord.setSource(source);
equipQrcodeRecord.setAlarmTime(new Date());
equipQrcodeRecord.setStatus(status);
equipQrcodeRecord.setSourceId(Long.valueOf(sourceId));
equipQrcodeRecord.setBizOrgCode(spec.getBizOrgCode());
equipQrcodeRecord.setBizOrgName(spec.getBizOrgName());
equipQrcodeRecord.setSystemCode(spec.getSystemId());
equipQrcodeRecordMapper.insert(equipQrcodeRecord);
}else {
//保存 wl_equip_qrcode_record 二维码记录
EquipQrcodeRecord equipQrcodeRecord = new EquipQrcodeRecord();
equipQrcodeRecord.setEquipid(Long.valueOf(id));
equipQrcodeRecord.setSource(source);
equipQrcodeRecord.setAlarmTime(new Date());
equipQrcodeRecord.setStatus(status);
equipQrcodeRecord.setSourceId(Long.valueOf(sourceId));
equipQrcodeRecord.setBizOrgCode(spec.getBizOrgCode());
equipQrcodeRecord.setBizOrgName(spec.getBizOrgName());
equipQrcodeRecord.setSystemCode(spec.getSystemId());
equipQrcodeRecordMapper.insert(equipQrcodeRecord);
if (oldStatus < newStatus){ //当前状态小于事件状态时 取更高级别状态赋码
equipmentSpecificSerivce.updateEquipSpecificStatus(status, id);
}
return CommonResponseUtil.success();
......@@ -441,17 +442,29 @@ public class EquipmentSpecificController extends AbstractBaseController {
@GetMapping(value = "/status/checkInput")
@TycloudOperation(ApiLevel = UserType.AGENCY)
@ApiOperation(httpMethod = "GET", value = "idx修改巡检项对应装备二维码状态", notes = "idx修改巡检项对应装备二维码状态")
public ResponseModel updateEquipSpecificStatusByCheckInput( String id){
@ApiOperation(httpMethod = "GET", value = "idx修改对应装备二维码状态", notes = "idx修改对应装备二维码状态")
public ResponseModel updateEquipSpecificStatusByCheckInput( @RequestParam(value = "id") String id,@RequestParam(value = "equipId",required = false) String equipId ){
//查询 巡检项所绑定装备
String equipId = equipmentSpecificSerivce.updateEquipSpecificStatusByCheckInput(id);
if (StringUtils.isEmpty(equipId)){
equipId = equipmentSpecificSerivce.updateEquipSpecificStatusByCheckInput(id);
}
LambdaQueryWrapper<EquipQrcodeRecord> query = new LambdaQueryWrapper<>();
query.eq(EquipQrcodeRecord::getEquipid,equipId);
query.notIn(EquipQrcodeRecord::getSourceId,id);
query.isNull(EquipQrcodeRecord::getCleanTime);
List<EquipQrcodeRecord> equipQrcodeRecords = equipQrcodeRecordMapper.selectList(query);
if (equipQrcodeRecords.size() == 0) { //如果记录表中此装备无未消除的故障等 则恢复绿码
//修改装备二维码状态为合格 此处为0代表绿色 是idx只有在合格是才会触发此接口
equipmentSpecificSerivce.updateEquipSpecificStatus("0", equipId);
} else {
//先修改本次清除的装备事件
EquipQrcodeRecord equipQrcodeRecord = equipQrcodeRecords.stream().filter(e -> e.getSourceId().equals(id)).findFirst().get();
equipQrcodeRecords.remove(equipQrcodeRecord);
equipQrcodeRecord.setCleanTime(new Date());
equipQrcodeRecordMapper.updateById(equipQrcodeRecord);
//然后取结果集中剩余状态最高的颜色赋码
String status = equipQrcodeRecords.stream().sorted(Comparator.comparing(EquipQrcodeRecord::getStatus)).findFirst().get().getStatus();
equipmentSpecificSerivce.updateEquipSpecificStatus(status, equipId);
}
return CommonResponseUtil.success();
}
......
......@@ -71,7 +71,7 @@ public class CarIotNewListener extends EmqxListener {
this.updateCarLocation(jsonObject, iotCode);
//如果map中已经存在该设备或者该设备有但是线程已经执行了
if ((!deviceInfo.containsKey(iotCode)) || (deviceInfo.containsKey(iotCode) && deviceInfo.get(iotCode) == null)) {
ThreadCar threadCar = new ThreadCar(iotCode, jsonObject, this.iWlCarMileageService, this.iotFeign, this.iCarService, this.emqkeeper, clipping_time);
ThreadCar threadCar = new ThreadCar(topic, jsonObject, this.iWlCarMileageService, this.iotFeign, this.iCarService, this.emqkeeper, clipping_time);
deviceInfo.put(iotCode, threadCar);
threadCar.start();
}
......@@ -87,7 +87,7 @@ public class CarIotNewListener extends EmqxListener {
deviceInfo.get(iotCode).interrupt();
} catch (Exception e) {
}
ThreadCar threadCar = new ThreadCar(iotCode, jsonObject, this.iWlCarMileageService, this.iotFeign, this.iCarService, this.emqkeeper, clipping_time);
ThreadCar threadCar = new ThreadCar(topic, jsonObject, this.iWlCarMileageService, this.iotFeign, this.iCarService, this.emqkeeper, clipping_time);
deviceInfo.put(iotCode, threadCar);
//更新车辆的最新坐标数据
deviceLastInfo.put(iotCode, coordinate);
......@@ -104,7 +104,8 @@ public class CarIotNewListener extends EmqxListener {
wlCarMileage.setStartLongitude(startLongitude);
wlCarMileage.setStartLatitude(startLatitude);
// Date startTime = UTCToCST();
Date startTime = new Date(jsonObject.getLong("time"));
//时间值被mysql自动转换
Date startTime = new Date((jsonObject.getLong("time")/1000)*1000);
wlCarMileage.setStartTime(startTime);
wlCarMileage.setStartName(getAddress(startLongitude, startLatitude));
wlCarMileage.setStartSpeed(Double.valueOf(jsonObject.getDoubleValue("FireCar_Speed")).intValue());
......
......@@ -202,7 +202,7 @@ public interface ICarService extends IService<Car> {
Page<CarEquipStateInfoDto> getCarEquipStateInfo();
Page<CarEquipAlarmInfoDto> getCarEquipAlarmInfoDto();
List<ChartIntegerDto>getCarBelongAreaInfo() ;
Page<CarAreaInfoDto> getCarAreaInfo(Long id ) ;
Page<CarAreaInfoDto> getCarAreaInfo(String areaName ) ;
Page<MileageDto> getCarMileageInfoByMoth(String date);
List<CarExportDto> exportCarMileageInfoByMoth(String date);
ZZChartsDto getCarMileageInfoByMothOFDay(String iotCode);
......
......@@ -13,28 +13,41 @@ public interface MqttReceiveService {
/**
* 增量数据处理
* @param topic 主题
*
* @param topic 主题
* @param message 消息内容
*/
void handlerMqttIncrementMessage(String topic, String message);
/**
* 处理交换站消息数据
* @param topic 主题
*
* @param topic 主题
* @param message 消息内容
*/
void handlerMqttRomaMessage(String topic, String message);
/**
* 处理Iot消息数据
* @param topic 主题
*
* @param topic 主题
* @param message 消息内容
*/
void handlerMqttIotMessage(String topic, String message);
/**
/**
* 中心级接收消息发送至消息服务
*
* @param topic
* @param message
*/
void handleDataToRiskModel(String topic, String message);
/**
* 处理Iot消息数据
*
* @param topic 主题
* @param message 消息内容
*/
void handlerIotMessage(String topic, String message);
}
......@@ -1826,23 +1826,26 @@ public class CarServiceImpl extends ServiceImpl<CarMapper, Car> implements ICarS
}
@Override
public Page<CarAreaInfoDto> getCarAreaInfo( Long id) {
public Page<CarAreaInfoDto> getCarAreaInfo( String areaName) {
Page<CarAreaInfoDto> page = new Page<>();
List<CarAreaInfoDto> list = new ArrayList<>();
FeignClientResult<java.util.Collection<CompanyModel>> de = Privilege.companyClient.querySubAgencyTree(id);
//如果是公司则获取所有片区
ArrayList<CompanyModel> companyModels = (ArrayList<CompanyModel>) de.getResult();
if (companyModels.size() >0 ) {
for (int i = 0; i < companyModels.size(); i++) {
CompanyModel companyModel = companyModels.get(i);
CarAreaInfoDto carAreaInfoDto = new CarAreaInfoDto();
Integer count = this.count(new QueryWrapper<Car>().like("biz_org_code", companyModel.getOrgCode()));
carAreaInfoDto.setNo(i+1);
carAreaInfoDto.setName(companyModel.getCompanyName());
carAreaInfoDto.setCountOfCar(count);
list.add(carAreaInfoDto);
}
}
CompanyModel companyModelArea= Privilege.companyClient.queryByCompanyName(areaName).getResult();
if(companyModelArea!=null){
FeignClientResult<java.util.Collection<CompanyModel>> de = Privilege.companyClient.querySubAgencyTree(companyModelArea.getSequenceNbr());
//如果是公司则获取所有片区
ArrayList<CompanyModel> companyModels = (ArrayList<CompanyModel>) de.getResult();
if (companyModels.size() >0 ) {
for (int i = 0; i < companyModels.size(); i++) {
CompanyModel companyModel = companyModels.get(i);
CarAreaInfoDto carAreaInfoDto = new CarAreaInfoDto();
Integer count = this.count(new QueryWrapper<Car>().like("biz_org_code", companyModel.getOrgCode()));
carAreaInfoDto.setNo(i+1);
carAreaInfoDto.setName(companyModel.getCompanyName());
carAreaInfoDto.setCountOfCar(count);
list.add(carAreaInfoDto);
}
}
}
page.setRecords(list);
page.setTotal(list.size());
page.setCurrent(1);
......
......@@ -1714,16 +1714,18 @@ public class EquipmentSpecificSerivceImpl extends ServiceImpl<EquipmentSpecificM
@Override
public void updateEquipmentSpecIndexRealtimeData(EquipmentSpecificIndex index) {
// TODO Auto-generated method stub
if (!ObjectUtils.isEmpty(index)) {
if (!ObjectUtils.isEmpty(index.getEquipmentSpecificId())) {
EquipmentSpecific es = equipmentSpecificMapper.selectById(index.getEquipmentSpecificId());
es.setRealtimeIotEsIndexId(index.getId());
es.setRealtimeIotIndexKey(index.getNameKey());
es.setRealtimeIotIndexName(index.getEquipmentSpecificIndexName());
es.setRealtimeIotIndexValue(index.getValue());
es.setRealtimeIotIndexId(index.getEquipmentIndexId());
es.setRealtimeIotIndexUpdateDate(index.getUpdateDate());
es.setValueLabel(index.getValueLabel());
equipmentSpecificMapper.updateById(es);
if(!ObjectUtils.isEmpty(es)){
es.setRealtimeIotEsIndexId(index.getId());
es.setRealtimeIotIndexKey(index.getNameKey());
es.setRealtimeIotIndexName(index.getEquipmentSpecificIndexName());
es.setRealtimeIotIndexValue(index.getValue());
es.setRealtimeIotIndexId(index.getEquipmentIndexId());
es.setRealtimeIotIndexUpdateDate(index.getUpdateDate());
es.setValueLabel(index.getValueLabel());
equipmentSpecificMapper.updateById(es);
}
}
}
......
......@@ -5,7 +5,9 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yeejoin.amos.boot.biz.common.utils.RedisKey;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.amos.component.influxdb.InfluxDbConnection;
import com.yeejoin.amos.feign.privilege.model.DepartmentModel;
import com.yeejoin.amos.feign.systemctl.model.MessageModel;
import com.yeejoin.equipmanage.common.datasync.entity.FireEquipmentDefectAlarm;
......@@ -36,6 +38,7 @@ import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronization;
......@@ -48,12 +51,15 @@ import org.typroject.tyboot.core.foundation.utils.ValidationUtil;
import org.typroject.tyboot.core.restful.exception.instance.BadRequest;
import org.typroject.tyboot.core.restful.utils.ResponseModel;
import javax.annotation.PostConstruct;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
......@@ -118,6 +124,12 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
//消防炮
@Value("${equipment.plan.monitor}")
String monitorCodes;
@Autowired
private InfluxDbConnection influxDbConnection;
private Executor dataExecutor = new ThreadPoolTaskExecutor();
/**
* 泡沫罐KEY
*/
......@@ -379,22 +391,22 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
JSONObject jsonObject = JSONObject.parseObject(message);
String dataType = jsonObject.getString("datatype");
String indexAddress,value,timeStamp,quality = null;
String indexAddress, value, timeStamp, quality = null;
//如果消息是遥信类型,进行指标转换
if(dataType != null && dataType.equals("state")){
if (dataType != null && dataType.equals("state")) {
indexAddress = jsonObject.getString("scadaid");
value = jsonObject.getInteger("value") ==1 ? "true":"false";
value = jsonObject.getInteger("value") == 1 ? "true" : "false";
timeStamp = jsonObject.getString("timestamp");
}else{
} else {
indexAddress = jsonObject.getString("key");
value = jsonObject.getString("value");
timeStamp = jsonObject.getString("time_stamp");
quality = jsonObject.getString("quality");
}
EquipmentSpecificIndex equipmentSpeIndex = equipmentSpecificIndexService.getEquipmentSpeIndexByIndexAddress(indexAddress,null);
if (equipmentSpeIndex == null){
EquipmentSpecificIndex equipmentSpeIndex = equipmentSpecificIndexService.getEquipmentSpeIndexByIndexAddress(indexAddress, null);
if (equipmentSpeIndex == null) {
return;
}
equipmentSpeIndex.setValue(value);
......@@ -627,32 +639,110 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
@Override
@Transactional(rollbackFor = Exception.class)
public void handlerIotMessage(String topic, String message) {
Map<Object, Object> equipmentIndexVOMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS);
dataExecutor.execute(new Runnable() {
@Override
public void run() {
log.info("接收到iot消息: {}", message);
JSONObject jsonObject = JSONObject.parseObject(message);
String dataType = jsonObject.getString("dataType");
String indexAddress = jsonObject.getString("address");
String traceId = jsonObject.getString("traceId");
String deviceCode = jsonObject.getString("deviceCode");
String gatewayId = jsonObject.getString("gatewayId");
String value = jsonObject.getString("value");
String key = indexAddress + "_" + gatewayId;
try {
if (equipmentIndexVOMap.get(key) != null) {
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
EquipmentIndexVO equipmentSpeIndex = (EquipmentIndexVO) equipmentIndexVOMap.get(key);
tagsMap.put("key", key);
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
fieldsMap.put("traceId", traceId);
fieldsMap.put("address", indexAddress);
fieldsMap.put("value", value);
fieldsMap.put("valueLabel", valueLabel.equals("") ? value : valueLabel);
fieldsMap.put("gatewayId", gatewayId);
fieldsMap.put("dataType", dataType);
fieldsMap.put("equipmentId", equipmentSpeIndex.getEquipmentId());
fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
//保存influxDB库
influxDbConnection.insert("iot_data", tagsMap, fieldsMap);
}
} catch (Exception e) {
log.error("Iot透传消息解析入库失败" + e.getMessage(), e);
} finally {
}
}
});
}
/**
* 废弃代码,暂时不用,后期删除
* @param topic 主题
* @param message 消息内容
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void handlerMqttIotMessage(String topic, String message) {
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
log.info("接收到iot消息: {}", message);
TopicEntityVo topicEntity = new TopicEntityVo();
topicEntity.setTopic(topic);
topicEntity.setMessage(message);
List<IotDataVO> iotDatalist = new ArrayList<>();
List<EquipmentSpecificIndex> equipmentSpecificIndexList = new ArrayList<>();
List<EquipmentSpecificAlarm> equipmentSpecificAlarms = new ArrayList<>();
List<IndexStateVo> indexStateList = new ArrayList<>();
JSONObject jsonObject = JSONObject.parseObject(message);
String dataType = jsonObject.getString("dataType");
String indexAddress = jsonObject.getString("address");
String traceId = jsonObject.getString("traceId");
String deviceCode = jsonObject.getString("deviceCode");
String gatewayId = jsonObject.getString("gatewayId");
String value;
//如果消息是遥信类型,进行指标转换
if(dataType != null && dataType.equals("state")){
value = jsonObject.getString("value");
}else{
value = jsonObject.getInteger("value") ==1 ? "true":"false";
}
EquipmentSpecificIndex equipmentSpeIndex = equipmentSpecificIndexService.getEquipmentSpeIndexByIndexAddress(indexAddress,gatewayId);
if (equipmentSpeIndex == null){
String value = jsonObject.getString("value");
String key = indexAddress + "_" + gatewayId;
Map<Object, Object> equipmentIndexVOMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS);
if (equipmentIndexVOMap.get(key) != null) {
EquipmentIndexVO equipmentSpeIndex = (EquipmentIndexVO) equipmentIndexVOMap.get(key);
tagsMap.put("key", indexAddress + "_" + gatewayId);
String valueLabel = valueTranslate(value, equipmentSpeIndex.getValueEnum());
fieldsMap.put("traceId", traceId);
fieldsMap.put("address", indexAddress);
fieldsMap.put("value", value);
fieldsMap.put("valueLabel", valueLabel.equals("") ? value : valueLabel);
fieldsMap.put("gatewayId", gatewayId);
fieldsMap.put("dataType", dataType);
fieldsMap.put("equipmentId", equipmentSpeIndex.getEquipmentId());
// fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
// fieldsMap.put("equipmentIndexKey", equipmentSpeIndex.getEquipmentIndexKey());
// fieldsMap.put("isAlarm", equipmentSpeIndex.getIsAlarm().toString());
fieldsMap.put("unit", equipmentSpeIndex.getUnitName());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
//保存influxDB库
influxDbConnection.insert("iot_data", tagsMap, fieldsMap);
}
EquipmentSpecificIndex equipmentSpeIndex = equipmentSpecificIndexService.getEquipmentSpeIndexByIndexAddress(indexAddress, gatewayId);
if (equipmentSpeIndex == null) {
return;
}
......@@ -664,10 +754,8 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
equipmentSpeIndex.setDataType(dataType);
equipmentSpeIndex.setTraceId(traceId);
equipmentSpeIndex.setUUID(UUIDUtils.getUUID());
IotDataVO iotDataVO = new IotDataVO();
iotDataVO.setKey(equipmentSpeIndex.getNameKey());
iotDataVO.setValue(value);
iotDatalist.add(iotDataVO);
//更新装备性能指标
//equipmentSpecificIndexService.updateById(equipmentSpeIndex);
QueryWrapper<EquipmentSpecific> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("id", equipmentSpeIndex.getEquipmentSpecificId());
......@@ -690,11 +778,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
topicEntity.setType(equipmentSpecificVo.getType());
topicEntity.setCode(equipmentSpecificVo.getCode());
//es存储数据
eSeqService.saveESEquiplistSpecificBySystemESVO(equipmentSpeIndex, String.valueOf(equipmentSpecificVo.getSystemId()), equipmentSpecificVo.getSystemName());
//更新装备性能指标
equipmentSpecificIndexService.updateById(equipmentSpeIndex);
// 更新设备表指标状态
iEquipmentSpecificSerivce.updateEquipmentSpecIndexRealtimeData(equipmentSpeIndex);
......@@ -705,109 +788,11 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
// 添加指标报告
saveEquipmentAlarmReportDay(equipmentSpeIndex);
// 火眼数据构造告警指标逻辑
equipmentSpeIndex = handleTemperatureAlarm(equipmentSpeIndex, iotDatalist);
boolean alarmFlag = false;
Map<String, String> messageBodyMap = new HashMap<>();
//管网压力、泡沫罐信息、水箱液位告警处理
if (iotDataVO.getKey().toLowerCase().equals(CAFS_FoamTank_FoamTankLevel.toLowerCase()) ||
FHS_PipePressureDetector_PipePressure.toLowerCase().equals(iotDataVO.getKey().toLowerCase()) ||
iotDataVO.getKey().toLowerCase().equals(CAFS_WaterTank_WaterTankLevel.toLowerCase())) {
alarmFlag = doFoamTankLevel(iotDataVO, equipmentSpeIndex, messageBodyMap);
}
//消防水池液位处理
if (iotDataVO.getKey().toLowerCase().equals(FHS_FirePoolDevice_WaterLevel.toLowerCase()) ||
iotDataVO.getKey().toLowerCase().equals(FHS_WirelessliquidDetector_WaterLevel.toLowerCase())) {
alarmFlag = doWaterPoolLevel(iotDataVO, equipmentSpeIndex, messageBodyMap);
}
// 遥测数据生成告警事件、日志处理
if (iotDataVO.getKey().toLowerCase().equals(CAFS_FoamTank_FoamTankLevel.toLowerCase()) ||
FHS_PipePressureDetector_PipePressure.toLowerCase().equals(iotDataVO.getKey().toLowerCase()) ||
iotDataVO.getKey().toLowerCase().equals(CAFS_WaterTank_WaterTankLevel.toLowerCase()) ||
iotDataVO.getKey().toLowerCase().equals(FHS_FirePoolDevice_WaterLevel.toLowerCase()) ||
iotDataVO.getKey().toLowerCase().equals(FHS_WirelessliquidDetector_WaterLevel.toLowerCase())) {
handlingAlarms(equipmentSpeIndex, alarmFlag);
}
// 指标告警处理
if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
equipmentSpecificAlarms.addAll(createIndexAlarmRecord(equipmentSpeIndex, messageBodyMap));
}
// 遥测遥信数据推送云端kafka
JSONObject jsonObjectXf = new JSONObject();
jsonObjectXf.put("data_class", "realdata");
if (equipmentSpeIndex.getIsTrend() == 1) {
jsonObjectXf.put("data_type", "analog");
} else {
jsonObjectXf.put("data_type", "state");
}
String date = DateUtils.date2LongStr(new Date());
jsonObjectXf.put("op_type", "subscribe_emergency");
JSONObject jsonObjectCondition = new JSONObject();
jsonObjectCondition.put("station_psr_id", stationCode);
jsonObjectCondition.put("station_name", stationName);
jsonObjectCondition.put("data_upload_time", date);
jsonObjectXf.put("condition", jsonObjectCondition);
JSONObject jsonObjectData = new JSONObject();
jsonObjectData.put("psrId", stationCode);
jsonObjectData.put("astId", equipmentSpeIndex.getSpecificCode());
jsonObjectData.put("equipType", equipmentSpeIndex.getEquipmentCode());
jsonObjectData.put("name", equipmentSpeIndex.getEquipmentSpecificName() + "-" + equipmentSpeIndex.getEquipmentSpecificIndexName());
if (value.equals("true")) {
jsonObjectData.put("value", "1");
} else if (value.equals("false")) {
jsonObjectData.put("value", "0");
} else {
jsonObjectData.put("value", value);
}
jsonObjectData.put("measurementType", null == equipmentSpeIndex.getEquipmentIndexKey() ? "" : equipmentSpeIndex.getEquipmentIndexKey());
jsonObjectData.put("dateTime", date);
jsonObjectData.put("quality", "0"); // 量测质量码:0 有效,1 无效
List<JSONObject> jsonObjects = Arrays.asList(jsonObjectData);
jsonObjectXf.put("data", jsonObjects);
// 遥测
if (!isOpenTelemetering && equipmentSpeIndex.getIsTrend() == 1) {
} else {
try {
emqKeeper.getMqttClient().publish("emq.xf.created", jsonObjectXf.toString().getBytes(), 1, false);
log.info("遥测遥信数据推送云端kafka成功");
} catch (MqttException e) {
log.error("遥测遥信数据推送云端kafka失败=====>" + e.getMessage());
e.printStackTrace();
}
}
// 报警数据保存
List<EquipmentSpecificAlarmLog> alarmLogs = new ArrayList<>();
if (!ObjectUtils.isEmpty(equipmentSpecificAlarms)) {
equipmentSpecificAlarmService.saveOrUpdateBatch(equipmentSpecificAlarms);
}
// 需要在事务提交之后,否则事务隔离查询不出数据
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
equipmentSpecificAlarms.forEach(action -> {
if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) {
alarmLogs.add(addEquipAlarmLogRecord(action));
if (ValidationUtil.isEmpty(action.getAlamContent())) {
action.setAlamContent(action.getEquipmentSpecificName() + action.getEquipmentSpecificIndexName());
}
mqttSendGateway.sendToMqtt(TopicEnum.EQDQR.getTopic(), JSONArray.toJSON(action).toString());
} else {
alarmLogs.addAll(upAlarmLogStatus(action.getIotCode(), action.getEquipmentSpecificIndexKey(), action.getTraceId(),
equipmentSpecificAlarmLogService, false));
mqttSendGateway.sendToMqtt(TopicEnum.EQYQR.getTopic(), JSONArray.toJSON(action).toString());
bool = Boolean.TRUE;
}
});
// 直流中心消息推送刷新
publishDataToDCCenterPage(equipmentSpecificIndexList);
......@@ -835,47 +820,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
// 向画布推送
publishDataToCanvas(equipmentSpecificIndexList);
// 向其他系统推送报警
equipmentAlarmLogsToOtherSystems(alarmLogs);
if (equipmentSpecificVo.getEcode() != null) {
String ecode = equipmentSpecificVo.getEcode();
boolean flag = false;
//消防泵
String[] strings = pumpCodes.split(",");
for (String string : strings) {
if (ecode.startsWith(string)) {
//通知>消防应急预案
topicEntity.setType("xfb");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
flag = true;
break;
}
}
// 消防炮
String[] stringxfp = monitorCodes.split(",");
if (!flag) {
for (String string1 : stringxfp) {
if (ecode.startsWith(string1)) {
//通知>消防应急预案
topicEntity.setType("xfp");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
flag = true;
break;
}
}
}
//消防水源
if (!flag) {
List<Map> lit = iEquipmentSpecificSerivce.getWater(equipmentSpecificVo.getId());
if (lit != null && lit.size() > 0) {
topicEntity.setType("xfsy");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
}
}
}
}
}
});
......@@ -1076,8 +1020,8 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
} else {
//恢复后修改 wl_equip_qrcode_record中对应记录
LambdaQueryWrapper<EquipQrcodeRecord> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(EquipQrcodeRecord::getEquipid,action.getEquipmentSpecificId());
wrapper.eq(EquipQrcodeRecord::getSourceId,action.getId());
wrapper.eq(EquipQrcodeRecord::getEquipid, action.getEquipmentSpecificId());
wrapper.eq(EquipQrcodeRecord::getSourceId, action.getId());
EquipQrcodeRecord equipQrcodeRecord = equipQrcodeRecordMapper.selectOne(wrapper);
equipQrcodeRecord.setCleanTime(new Date());
equipQrcodeRecordMapper.updateById(equipQrcodeRecord);
......@@ -1089,13 +1033,15 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
mqttSendGateway.sendToMqtt(TopicEnum.EQYQR.getTopic(), JSONArray.toJSON(action).toString());
bool = Boolean.TRUE;
//查询二维码事件记录表中该设备的历史数据
LambdaQueryWrapper<EquipQrcodeRecord> query = new LambdaQueryWrapper<>();
query.eq(EquipQrcodeRecord::getEquipid,action.getEquipmentSpecificId());
query.eq(EquipQrcodeRecord::getEquipid, action.getEquipmentSpecificId());
query.isNull(EquipQrcodeRecord::getCleanTime);
List<EquipQrcodeRecord> equipQrcodeRecords = equipQrcodeRecordMapper.selectList(query);
if (equipQrcodeRecords.size() > 0){ //如果记录表中还存在未消除的巡检故障 则先修改为黄码
String status = equipQrcodeRecords.stream().sorted(Comparator.comparing(EquipQrcodeRecord::getStatus)).findFirst().get().getStatus();
if (equipQrcodeRecords.size() > 0) { //如果记录表中还存在未消除的巡检故障 则先修改为黄码
iEquipmentSpecificSerivce.updateEquipSpecificStatus(equipQrcodeRecords.get(0).getStatus(), String.valueOf(specific.getId()));
}else {
} else {
iEquipmentSpecificSerivce.updateEquipSpecificStatus(specific.getEquipStatus(), String.valueOf(specific.getId()));
}
......@@ -1112,7 +1058,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
// 触发风险---> 站端发送消息到Message服务
publishDataToMessage(equipmentSpecificIndexList, isAlarm);
if("zd".equals(system)){
if ("zd".equals(system)) {
System.out.println("站端系统----------------");
// 向预控系统发送消息
......@@ -1133,47 +1079,47 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
// 向画布推送
publishDataToCanvas(equipmentSpecificIndexList);
// 向其他系统推送报警
equipmentAlarmLogsToOtherSystems(alarmLogs);
if(vo.getEcode()!=null){
String ecode= vo.getEcode();
boolean flag=false;
//消防泵
String[] strings = pumpCodes.split(",");
for (String string : strings) {
if(ecode.startsWith(string)){
//通知>消防应急预案
topicEntity.setType("xfb");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
flag=true;
break;
}
}
// 消防炮
String[] stringxfp = monitorCodes.split(",");
if(!flag){
for (String string1 : stringxfp) {
if(ecode.startsWith(string1)){
//通知>消防应急预案
topicEntity.setType("xfp");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
flag=true;
break;
}
}
}
//消防水源
if(!flag){
List<Map> lit= iEquipmentSpecificSerivce.getWater(vo.getId());
if(lit!=null&& lit.size()>0){
topicEntity.setType("xfsy");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
}
}
}
// 向其他系统推送报警
equipmentAlarmLogsToOtherSystems(alarmLogs);
if (vo.getEcode() != null) {
String ecode = vo.getEcode();
boolean flag = false;
//消防泵
String[] strings = pumpCodes.split(",");
for (String string : strings) {
if (ecode.startsWith(string)) {
//通知>消防应急预案
topicEntity.setType("xfb");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
flag = true;
break;
}
}
// 消防炮
String[] stringxfp = monitorCodes.split(",");
if (!flag) {
for (String string1 : stringxfp) {
if (ecode.startsWith(string1)) {
//通知>消防应急预案
topicEntity.setType("xfp");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
flag = true;
break;
}
}
}
//消防水源
if (!flag) {
List<Map> lit = iEquipmentSpecificSerivce.getWater(vo.getId());
if (lit != null && lit.size() > 0) {
topicEntity.setType("xfsy");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
}
}
}
}
}
});
......@@ -2599,4 +2545,32 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
}
}
}
@PostConstruct
public void iotAsyncExecutor() {
ThreadPoolTaskExecutor workExecutor = new ThreadPoolTaskExecutor();
// 设置核心线程数
int length = Runtime.getRuntime().availableProcessors();
int size = Math.max(length, 80);
workExecutor.setCorePoolSize(size * 2);
log.info("装备服务初始化,系统线程数:{},运行线程数:{}", length, size);
// 设置最大线程数
workExecutor.setMaxPoolSize(workExecutor.getCorePoolSize());
//配置队列大小
workExecutor.setQueueCapacity(Integer.MAX_VALUE);
// 设置线程活跃时间(秒)
workExecutor.setKeepAliveSeconds(60);
// 设置默认线程名称
workExecutor.setThreadNamePrefix("装备服务-Iot透传消息消费线程池" + "-");
// 等待所有任务结束后再关闭线程池
//当调度器shutdown被调用时,等待当前被调度的任务完成
workExecutor.setWaitForTasksToCompleteOnShutdown(true);
//执行初始化
workExecutor.initialize();
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
workExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
this.dataExecutor = workExecutor;
}
}
......@@ -12,6 +12,7 @@ import com.yeejoin.equipmanage.service.IWlCarMileageService;
import com.yeejoin.equipmanage.utils.CarUtils;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.jfree.util.Log;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.core.restful.utils.ResponseModel;
......@@ -50,6 +51,7 @@ public class ThreadCar extends Thread {
JSONObject lastObj = null;
WlCarMileage last = null;
try {
Log.info("-------------------------" + this.topic + "结束坐标开始计时------------------------------");
this.sleep(clippingTime);
//业务处理
//如果十分钟没有坐标,则需要设置结束标记
......@@ -84,18 +86,20 @@ public class ThreadCar extends Thread {
// JSONObject.parseObject(JSONObject.toJSONString(list.get(list.size() - 1)));
if (lastObj == null) {
lastObj = new JSONObject();
lastObj.put("FireCar_Longitude", 0.0);
lastObj.put("FireCar_Latitude", 0.0);
lastObj.put("FireCar_Longitude", last.getStartLongitude());
lastObj.put("FireCar_Latitude", last.getEndLatitude());
lastObj.put("time", 0);
lastObj.put("FireCar_Speed", 0);
}
double endLongitude = lastObj.getDoubleValue("FireCar_Longitude");
double endLatitude = lastObj.getDoubleValue("FireCar_Latitude");
// 230215180624
// Date endTime =UTCToCST(lastObj.getString("time"));
Date endTime = new Date(jsonObject.getLong("time"));
Date endTime = new Date();
//实时库中的时间虽然坐标与记录的一致,但是更新时间可能最新,故可能会有记录的结束时间早于开始时间
long takeTime = (endTime.getTime() / 1000 * 1000) - (last.getStartTime().getTime() / 1000 * 1000);
if(takeTime<0){
takeTime = 0-takeTime;
}
last.setEndLongitude(endLongitude);
last.setEndLatitude(endLatitude);
last.setEndTime(endTime);
......@@ -113,7 +117,7 @@ public class ThreadCar extends Thread {
}
last.setTravel(new BigDecimal(travel / 1000).setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue());
iWlCarMileageService.updateById(last);
System.out.println("============================================================更新结束坐标成功==========:"+topic);
Log.info("============================================================更新结束坐标成功==========:"+topic);
this.interrupt();
}
} catch (Exception exception) {
......
package com.yeejoin.equipmanage.thread;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yeejoin.equipmanage.common.entity.Car;
import com.yeejoin.equipmanage.common.entity.WlCarMileage;
import com.yeejoin.equipmanage.common.utils.CoordinateUtil;
import com.yeejoin.equipmanage.fegin.IotFeign;
import com.yeejoin.equipmanage.service.ICarService;
import com.yeejoin.equipmanage.service.IWlCarMileageService;
import com.yeejoin.equipmanage.service.impl.CarServiceImpl;
import com.yeejoin.equipmanage.service.impl.WlCarMileageServiceImpl;
import com.yeejoin.equipmanage.utils.CarUtils;
import liquibase.pro.packaged.E;
import org.jfree.util.Log;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.typroject.tyboot.component.emq.EmqKeeper;
import org.typroject.tyboot.core.restful.utils.ResponseModel;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@Component
public class ThreadCarMileageTreatment extends Thread {
@Autowired
private WlCarMileageServiceImpl wlCarMileageServiceImpl;
@Autowired
CarServiceImpl carServiceImpl;
@Autowired
private IotFeign iotFeign;
@Value("${mileage.clippingtime}")
private Long clipping_time;
@Override
public void run() {
Log.info("----------------------------------------------------开始处理未结束里程---------------------------------");
HashMap<String, String> hashMap = new HashMap<>();
//toDo
WlCarMileage last = null;
JSONObject lastObj = null;
Car car =null;
List<WlCarMileage> wlCarMileageList = wlCarMileageServiceImpl.list(new QueryWrapper<WlCarMileage>().isNull("end_time"));
for (int i = 0; i < wlCarMileageList.size(); i++) {
car = carServiceImpl.getOne(new QueryWrapper<Car>().eq("iot_code", wlCarMileageList.get(i).getIotCode()));
String coordinateSting = String.valueOf(car.getLongitude()) + String.valueOf(car.getLatitude());
hashMap.put(car.getIotCode(), coordinateSting);
}
try {
Thread.sleep(clipping_time);
for (int i = 0; i < wlCarMileageList.size(); i++) {
WlCarMileage wlCarMileage =wlCarMileageList.get(i);
car = carServiceImpl.getOne(new QueryWrapper<Car>().eq("iot_code", wlCarMileage.getIotCode()));
String coordinateSting = String.valueOf(car.getLongitude()) + String.valueOf(car.getLatitude());
if (coordinateSting.equals(hashMap.get(car.getIotCode()))) {
String iotCode = car.getIotCode();
String measurement = "0THMcLKR";
String deviceName = iotCode.replace(measurement, "");
last = wlCarMileageServiceImpl
.getOne(new LambdaQueryWrapper<WlCarMileage>().eq(WlCarMileage::getIotCode, iotCode)
.isNull(WlCarMileage::getEndLongitude).isNull(WlCarMileage::getEndLatitude)
.orderByDesc(WlCarMileage::getStartTime).last("limit 1"));
ResponseModel<List<Object>> result = iotFeign.getLiveData(measurement, deviceName,
last.getStartTime(), new Date(new Date().getTime() + 2000));
List<Object> list = result.getResult();
if (list != null && list.size() > 0) {
// 过滤空坐标
List<Object> filterList = new ArrayList<Object>();
for (int j = 0; j < list.size(); j++) {
JSONObject Obj = JSONObject.parseObject(JSONObject.toJSONString(list.get(j)));
if (Obj.get("FireCar_Longitude") != null && Obj.get("FireCar_Latitude") != null
&& Obj.getDoubleValue("FireCar_Longitude") != 0
&& Obj.getDoubleValue("FireCar_Latitude") != 0) {
filterList.add(list.get(j));
// 获取第一个不为空的坐标
if (lastObj == null) {
lastObj = Obj;
}
}
}
Log.info("----------------------------------------lastobj----------------------"+lastObj.toJSONString());
if (lastObj == null) {
lastObj = new JSONObject();
lastObj.put("FireCar_Longitude", last.getStartLongitude());
lastObj.put("FireCar_Latitude", last.getEndLatitude());
lastObj.put("time", 0);
lastObj.put("FireCar_Speed", 0);
}
double endLongitude = lastObj.getDoubleValue("FireCar_Longitude");
double endLatitude = lastObj.getDoubleValue("FireCar_Latitude");
Date endTime = new Date();
long takeTime = (endTime.getTime() / 1000 * 1000) - (last.getStartTime().getTime() / 1000 * 1000);
if(takeTime<0){
takeTime = 0-takeTime;
}
last.setEndLongitude(endLongitude);
last.setEndLatitude(endLatitude);
last.setEndTime(endTime);
last.setEndName(CarUtils.getAddress(endLongitude, endLatitude));
last.setEndSpeed(lastObj.getIntValue("FireCar_Speed"));
last.setTakeTime(takeTime);
double travel = 0.0;
// 获取里程
for (int k = 0; k < filterList.size() - 1; k++) {
JSONObject start = JSONObject.parseObject(JSONObject.toJSONString(filterList.get(k)));
JSONObject end = JSONObject.parseObject(JSONObject.toJSONString(filterList.get(k + 1)));
travel += CoordinateUtil.distance(start.getDoubleValue("FireCar_Latitude"),
start.getDoubleValue("FireCar_Longitude"), end.getDoubleValue("FireCar_Latitude"),
end.getDoubleValue("FireCar_Longitude"));
}
last.setTravel(new BigDecimal(travel / 1000).setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue());
Log.info("----------------------------------------last----------------------"+lastObj.toJSONString());
wlCarMileageServiceImpl.updateById(last);
}
}
}
} catch (Exception exception) {
exception.printStackTrace();
}
}
}
......@@ -3,6 +3,7 @@ package com.yeejoin;
import com.yeejoin.amos.boot.biz.common.utils.oConvertUtils;
import com.yeejoin.equipmanage.listener.CarIotListener;
import com.yeejoin.equipmanage.listener.CarIotNewListener;
import com.yeejoin.equipmanage.thread.ThreadCarMileageTreatment;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.mybatis.spring.annotation.MapperScan;
import org.slf4j.Logger;
......@@ -54,6 +55,10 @@ public class AmostEquipApplication {
@Autowired
private CarIotNewListener carIotNewListener;
@Autowired
private ThreadCarMileageTreatment threadCarMileageTreatment;
public static void main(String[] args) throws UnknownHostException {
ConfigurableApplicationContext context = SpringApplication.run(AmostEquipApplication.class, args);
......@@ -82,4 +87,9 @@ public class AmostEquipApplication {
void initMqtt() throws MqttException {
emqKeeper.getMqttClient().subscribe("+/+/property", 1, carIotNewListener);
}
//江西电建服务重启后对于未计时且未结束的里程的进行处理
@Bean
void initCarMelige() {
threadCarMileageTreatment.start();
}
}
......@@ -139,4 +139,14 @@ mileage.clippingtime=600000
equip.car.alarmBattery= 10
equip.car.maxTravel=400
equip.risk.model.topic=""
#mileage.segmentation.cron= 0 */2 * * * ?
\ No newline at end of file
#mileage.segmentation.cron= 0 */2 * * * ?
# influxDB
spring.influx.url=http://172.16.11.201:8086
spring.influx.password=Yeejoin@2020
spring.influx.user=root
spring.influx.database=iot_platform
spring.influx.retention_policy=default
spring.influx.retention_policy_time=30d
spring.influx.actions=10000
spring.influx.bufferLimit=20000
\ No newline at end of file
......@@ -9,7 +9,7 @@ spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
#mybatis mapper file
mybatis.mapper-locations=classpath:mapper/*.xml
#mybatis-plus
mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.slf4j.Slf4jImpl
# mybatis entity package
mybatis.type-aliases-package=com.yeejoin.equipmanage.common.entity
spring.jackson.time-zone=GMT+8
......
......@@ -131,18 +131,17 @@
<select id="getEquipmentSpeIndexDataByIotCode"
resultType="com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex">
SELECT wesi.id AS id,
wei.name_key AS nameKey,
IFNULL(si.value_label, si.`value`) AS 'value',
wesi.equipment_specific_id AS equipmentSpecificId,
wesi.equipment_index_id AS equipmentIndexId,
wes.org_code AS code,
wes.iot_code AS iotCode,
wes.org_code AS orgCode,
wei.type_code AS typeCode,
wei.name AS indexName,
wei.unit AS indexUnitName,
wei.value_enum AS valueEnum
SELECT wesi.id AS id,
wei.name_key AS nameKey,
IFNULL(si.value_label, si.`value`) AS 'value', wesi.equipment_specific_id AS equipmentSpecificId,
wesi.equipment_index_id AS equipmentIndexId,
wes.org_code AS code,
wes.iot_code AS iotCode,
wes.org_code AS orgCode,
wei.type_code AS typeCode,
wei.name AS indexName,
wei.unit AS indexUnitName,
wei.value_enum AS valueEnum
FROM wl_equipment_specific_index AS wesi
LEFT JOIN wl_equipment_specific AS wes ON wes.id = wesi.equipment_specific_id
LEFT JOIN wl_equipment_index AS wei ON wei.id = wesi.equipment_index_id
......@@ -175,31 +174,29 @@
WHERE wlves.equipment_specific_id = #{id}
</select>
<select id="getEquipmentDetailBySecificId" resultMap="EquipmentDetail">
SELECT wled.NAME equipment_name,
wle.id equip_id,
wled.id equip_detail_id,
SELECT wled.NAME equipment_name,
wle.id equip_id,
wled.id equip_detail_id,
wles.qr_code,
wles.CODE,
(
select GROUP_CONCAT(fs.name)
from f_fire_fighting_system fs
where FIND_IN_SET(fs.id, wles.system_id)
) as System_name,
wlec.NAME category_name,
wlws.full_name warehouse_name,
(select GROUP_CONCAT(fs.name)
from f_fire_fighting_system fs
where FIND_IN_SET(fs.id, wles.system_id)) as System_name,
wlec.NAME category_name,
wlws.full_name warehouse_name,
wled.standard,
sd.`name` country,
sd.`name` country,
wled.remark,
wled.maintenance_cycle,
wled.is_import,
wled.brand,
wlun.`name` unit_name,
wlmi.`name` manufacturer_name,
wlun.`name` unit_name,
wlmi.`name` manufacturer_name,
wlmi.service_tel,
wlmi.sales_tel,
wlmi.address,
wlmi.img,
wleias.`value` STATUS
wleias.`value` STATUS
FROM wl_equipment_specific wles
LEFT JOIN wl_equipment_detail wled ON wles.equipment_detail_id = wled.id
LEFT JOIN wl_system_dic sd ON sd.id = wled.country
......@@ -209,13 +206,11 @@
LEFT JOIN wl_warehouse_structure wlws ON wlsd.warehouse_structure_id = wlws.id
LEFT JOIN wl_unit wlun ON wle.unit_id = wlun.id
LEFT JOIN wl_manufacturer_info wlmi ON wlmi.id = wled.manufacturer_id
LEFT JOIN (
SELECT wlei.equipment_id,
wlesi.`value`
FROM wl_equipment_index wlei
LEFT JOIN wl_equipment_specific_index wlesi ON wlei.id = wlesi.equipment_index_id
WHERE wlei.name_key = 'runState'
) wleias ON wle.id = wleias.equipment_id
LEFT JOIN (SELECT wlei.equipment_id,
wlesi.`value`
FROM wl_equipment_index wlei
LEFT JOIN wl_equipment_specific_index wlesi ON wlei.id = wlesi.equipment_index_id
WHERE wlei.name_key = 'runState') wleias ON wle.id = wleias.equipment_id
WHERE wles.id = #{id}
</select>
<select id="selectEquProperty" resultType="java.util.HashMap">
......@@ -329,10 +324,13 @@
ei.`name` AS perfQuotaName,
si.`value`,
ei.is_iot,
ei.unit AS unitName,
si.unit AS unitName,
ei.sort_num,
si.create_date,
si.update_date
si.update_date,
si.index_address,
si.gateway_id,
si.data_type
FROM
wl_equipment_specific_index si
LEFT JOIN wl_equipment_index ei ON si.equipment_index_id = ei.id
......@@ -428,10 +426,10 @@
esi.equipment_index_name AS equipmentSpecificIndexName,
IF(
esi.value_label = ''
esi.value_label = ''
OR esi.value_label IS NULL,
esi.`value`,
esi.value_label
esi.`value`,
esi.value_label
) AS valueLabel,
es.position AS location,
esi.update_date AS createDate
......@@ -475,21 +473,21 @@
</select>
<select id="getEquipSpecificScrap" resultType="java.util.Map">
select wes.id,
str.name as sname,
wlsd.status,
wes.position,
wes.name,
we.expiry_date as weExpiry,
<!-- wed.expiry_date as wesExpiry,-->
wed.area as area,
wed.production_date as product
str.name as sname,
wlsd.status,
wes.position,
wes.name,
we.expiry_date as weExpiry,
<!-- wed.expiry_date as wesExpiry,-->
wed.area as area,
wed.production_date as product
from wl_equipment_specific wes
left join wl_equipment_detail wed on wes.equipment_detail_id = wed.id
left join wl_equipment we on wed.equipment_id = we.id
left join wl_warehouse_structure str on str.id = wes.warehouse_structure_id
left join wl_stock_detail wlsd on wes.id = wlsd.equipment_specific_id
left join wl_equipment_detail wed on wes.equipment_detail_id = wed.id
left join wl_equipment we on wed.equipment_id = we.id
left join wl_warehouse_structure str on str.id = wes.warehouse_structure_id
left join wl_stock_detail wlsd on wes.id = wlsd.equipment_specific_id
where wed.production_date is not null
and wlsd.status != 7
and wlsd.status != 7
</select>
<select id="getEquipIndexInIndex" resultType="com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex">
SELECT
......@@ -508,56 +506,58 @@
LEFT JOIN wl_equipment_detail wled ON es.equipment_detail_id = wled.id
<where>
<if test="list != null and list.size > 0 and type = 'id'">
si.equipment_index_key IN
si.equipment_index_key IN
<foreach collection="list" item="item" index="index" open="(" close=")" separator=",">
#{item}
</foreach>
</if>
and es.iot_code is not null
and es.iot_code != ''
</where>
</select>
<!-- 根据信号索引查询装备性能指标 -->
<select id="getEquipmentSpeIndexByIndexAddress"
resultType="com.yeejoin.equipmanage.common.entity.EquipmentSpecificIndex">
SELECT wesi.id AS id,
wei.name_key AS nameKey,
wesi.value AS value,
wesi.equipment_specific_id AS equipmentSpecificId,
wesi.equipment_index_id AS equipmentIndexId,
wesi.equipment_index_name AS equipmentIndexName,
wesi.equipment_index_key AS equipmentIndexKey,
wesi.value_label AS valueLabel,
wei.type_code AS typeCode,
wei.type_name AS typeName,
wei.name AS indexName,
wei.unit AS indexUnitName,
wes.org_code AS orgCode,
ed.`name` AS equipmentSpecificName,
ed.equipment_name AS equipmentName,
wes.iot_code AS iotCode,
wes.code AS specificCode,
wei.`name` AS equipmentSpecificIndexName,
wei.`value_enum` AS valueEnum,
wei.is_trend AS isTrend,
wes.qr_code AS qrCode,
wesi.update_date AS updateDate,
ed.code AS equipmentCode,
ed.equipment_id AS equipmentId,
ed.id AS equipmentDetailId,
wes.code as equipmentSpecificCode,
wes.system_id as systemId,
wesi.is_alarm as isAlarm,
wesi.emergency_level_color as emergencyLevelColor,
wesi.emergency_level as emergencyLevel,
wesi.emergency_level_describe as emergencyLevelDescribe,
wes.biz_org_name AS bizOrgName,
wes.biz_org_code AS bizOrgCode
SELECT wesi.id AS id,
wei.name_key AS nameKey,
wesi.value AS value,
wesi.equipment_specific_id AS equipmentSpecificId,
wesi.equipment_index_id AS equipmentIndexId,
wesi.equipment_index_name AS equipmentIndexName,
wesi.equipment_index_key AS equipmentIndexKey,
wesi.value_label AS valueLabel,
wei.type_code AS typeCode,
wei.type_name AS typeName,
wei.name AS indexName,
wesi.unit AS indexUnitName,
wes.org_code AS orgCode,
ed.`name` AS equipmentSpecificName,
ed.equipment_name AS equipmentName,
wes.iot_code AS iotCode,
wes.code AS specificCode,
wei.`name` AS equipmentSpecificIndexName,
wei.`value_enum` AS valueEnum,
wei.is_trend AS isTrend,
wes.qr_code AS qrCode,
wesi.update_date AS updateDate,
ed.code AS equipmentCode,
ed.equipment_id AS equipmentId,
ed.id AS equipmentDetailId,
wes.code as equipmentSpecificCode,
wes.system_id as systemId,
wesi.is_alarm as isAlarm,
wesi.emergency_level_color as emergencyLevelColor,
wesi.emergency_level as emergencyLevel,
wesi.emergency_level_describe as emergencyLevelDescribe,
wes.biz_org_name AS bizOrgName,
wes.biz_org_code AS bizOrgCode
FROM wl_equipment_specific_index AS wesi
LEFT JOIN wl_equipment_specific AS wes ON wes.id = wesi.equipment_specific_id
LEFT JOIN wl_equipment_detail ed ON ed.id = wes.equipment_detail_id
LEFT JOIN wl_equipment_index AS wei ON wei.id = wesi.equipment_index_id
WHERE
wesi.index_address = #{indexAddress}
wesi.index_address = #{indexAddress}
<if test="gatewayId != null">
AND wesi.gateway_id = #{gatewayId}
</if>
......
......@@ -3870,5 +3870,16 @@
</sql>
</changeSet>
</databaseChangeLog>
<changeSet author="ltw" id="20230614-ltw-01">
<preConditions onFail="MARK_RAN">
<not>
<columnExists tableName="cb_organization_user" columnName="post_name"/>
</not>
</preConditions>
<comment>modify table cb_organization_user modify columns</comment>
<sql>
ALTER TABLE `cb_organization_user` MODIFY `post_name` varchar(4000) DEFAULT NULL COMMENT '岗位名称'
</sql>
</changeSet>
</databaseChangeLog>
package com.yeejoin.amos;
import com.yeejoin.amos.message.kafka.KafkaProducerService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class AmosBootUtilsMessageApplicationTests {
@Autowired
private KafkaProducerService kafkaProducerService;
@Test
void contextLoads() {
String msg = "hello";
kafkaProducerService.sendMessageAsync("akka.iot.created",msg);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment