Commit 733e11e1 authored by wujiang's avatar wujiang

提交

parent 23b46eaf
......@@ -53,6 +53,7 @@ public class OpenapiBizTokenService extends BaseService<OpenapiBizTokenModel, Op
bizTokenModel.setApiCompanyCode(openapiBizToken.getApiCompanyCode());
RequestContext.setProduct(openapiBizToken.getProduct());
RequestContext.setAppKey(openapiBizToken.getAppKey());
System.out.println("================================openapi");
FeignClientResult<HashMap<String, Object>> responseModel = Privilege.authClient.idpassword(idPasswordAuthModel);
HashMap<String, Object> authModel = responseModel.getResult();
String token = (String) authModel.get("token");
......
......@@ -90,6 +90,7 @@ public class CompanyAndPersonTask {
IdPasswordAuthModel authModel = new IdPasswordAuthModel();
authModel.setLoginId(user);
authModel.setPassword(DesUtil.encode(password, secretKey));
System.out.println("================================companyandperson");
FeignClientResult<Map<String, String>> authResult = Privilege.authClient.idpassword(authModel);
String token = authResult.getResult().get("token");
RequestContext.setToken(token);
......
......@@ -41,6 +41,7 @@ public class StartPlatformTokenService {
authModel.setPassword(DesUtil.encode(password, secretKey));
System.out.println("user:" + user);
System.out.println("secretKey:" + secretKey);
System.out.println("================================startPlatform");
FeignClientResult<Map<String, String>> authResult = Privilege.authClient.idpassword(authModel);
String token = authResult.getResult().get("token");
System.out.println("token:" + token);
......
......@@ -285,6 +285,7 @@ public class PlatformUtils {
authModel.setPassword(DesUtil.encode(password, secretKey));
System.out.println("user:" + user);
System.out.println("secretKey:" + secretKey);
System.out.println("================================getSyncPlatformUser");
FeignClientResult<Map<String, String>> authResult = Privilege.authClient.idpassword(authModel);
String token = authResult.getResult().get("token");
System.out.println("token:" + token);
......
......@@ -52,6 +52,7 @@ public class AmosAuth {
arg0.setPassword(DesUtil.encode(password, "qaz"));
RequestContext.setProduct(product);
RequestContext.setAppKey(appKey);
System.out.println("================================AmosAuth");
FeignClientResult o = privilege.authClient.idpassword(arg0);
model = (HashMap<String, Object>) o.getResult();
flag = false;
......
package com.yeejoin.equipmanage.quartz;
import com.yeejoin.equipmanage.common.enums.AnalysisReportEnum;
import com.yeejoin.equipmanage.common.utils.DateUtils;
import com.yeejoin.equipmanage.service.IAnalysisReportLogService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.text.ParseException;
import java.util.Date;
/**
* @author DELL
*/
@Component
@EnableScheduling
public class AnalysisReportSchedulerJob {
@Autowired
private IAnalysisReportLogService iAnalysisReportLogService;
private final Logger log = LoggerFactory.getLogger(AnalysisReportSchedulerJob.class);
/**
* 每天凌晨0点-日报生成
*/
@Scheduled(cron = "0 0 0 * * ?")
public void dayReport() throws ParseException {
Date beginDate = DateUtils.dateAdd(new Date(),-1,false);
Date endDate = DateUtils.dateAdd(new Date(),-1,false);
iAnalysisReportLogService.generateReport(AnalysisReportEnum.DAY_REPORT,beginDate,endDate);
}
/**
* 每周1(对应日期的2)凌晨0点-周报生成
*/
@Scheduled(cron = "0 0 0 ? * 1")
public void weekReport() throws ParseException {
Date yestDay = DateUtils.dateAdd(new Date(),-1,false);
Date beginDate = DateUtils.getFirstDayOfWeek(yestDay);
Date endDate = DateUtils.getLastDayOfWeek(yestDay);
iAnalysisReportLogService.generateReport(AnalysisReportEnum.WEEK_REPORT,beginDate,endDate);
}
/**
* 每月第1天凌晨0-月报生成
*/
@Scheduled(cron="0 0 0 1 * ?")
public void monthReport() throws ParseException {
Date yestDay = DateUtils.dateAdd(new Date(),-1,false);
Date beginDate = DateUtils.getFirstDayOfMonth(yestDay);
Date endDate = DateUtils.getLastDayOfMonth(yestDay);
log.warn("monthReport报表开始生成");
iAnalysisReportLogService.generateMonthReport(AnalysisReportEnum.MONTH_REPORT,beginDate,endDate);
}
}
//package com.yeejoin.equipmanage.quartz;
//
//import com.yeejoin.equipmanage.common.enums.AnalysisReportEnum;
//import com.yeejoin.equipmanage.common.utils.DateUtils;
//import com.yeejoin.equipmanage.service.IAnalysisReportLogService;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.scheduling.annotation.EnableScheduling;
//import org.springframework.scheduling.annotation.Scheduled;
//import org.springframework.stereotype.Component;
//
//import java.text.ParseException;
//import java.util.Date;
//
///**
// * @author DELL
// */
//@Component
//@EnableScheduling
//public class AnalysisReportSchedulerJob {
//
// @Autowired
// private IAnalysisReportLogService iAnalysisReportLogService;
// private final Logger log = LoggerFactory.getLogger(AnalysisReportSchedulerJob.class);
// /**
// * 每天凌晨0点-日报生成
// */
// @Scheduled(cron = "0 0 0 * * ?")
// public void dayReport() throws ParseException {
// Date beginDate = DateUtils.dateAdd(new Date(),-1,false);
// Date endDate = DateUtils.dateAdd(new Date(),-1,false);
// iAnalysisReportLogService.generateReport(AnalysisReportEnum.DAY_REPORT,beginDate,endDate);
// }
//
// /**
// * 每周1(对应日期的2)凌晨0点-周报生成
// */
// @Scheduled(cron = "0 0 0 ? * 1")
// public void weekReport() throws ParseException {
// Date yestDay = DateUtils.dateAdd(new Date(),-1,false);
// Date beginDate = DateUtils.getFirstDayOfWeek(yestDay);
// Date endDate = DateUtils.getLastDayOfWeek(yestDay);
// iAnalysisReportLogService.generateReport(AnalysisReportEnum.WEEK_REPORT,beginDate,endDate);
// }
//
// /**
// * 每月第1天凌晨0-月报生成
// */
// @Scheduled(cron="0 0 0 1 * ?")
// public void monthReport() throws ParseException {
// Date yestDay = DateUtils.dateAdd(new Date(),-1,false);
// Date beginDate = DateUtils.getFirstDayOfMonth(yestDay);
// Date endDate = DateUtils.getLastDayOfMonth(yestDay);
// log.warn("monthReport报表开始生成");
// iAnalysisReportLogService.generateMonthReport(AnalysisReportEnum.MONTH_REPORT,beginDate,endDate);
// }
//}
package com.yeejoin.equipmanage.quartz;
import com.yeejoin.amos.component.rule.RuleTrigger;
import com.yeejoin.equipmanage.common.entity.dto.EquipQrDateDto;
import com.yeejoin.equipmanage.service.ICarService;
import com.yeejoin.equipmanage.service.impl.CarServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
/**
* 定时监控车辆相关指标
* @author xxz
*/
@Component
@EnableScheduling
@Slf4j
public class CarPropertyJob {
@Autowired
private ICarService carService;
@Autowired
private RuleTrigger ruleTrigger;
/**
* 车辆赋红码。定时查询车辆启停更新时间,
*/
@Scheduled(cron = "${update.car.qrCode}")
public void UpdateCarQrCode() {
List<Map<String, String>> list = carService.updateCarStartStatus();
EquipQrDateDto equipQrDateDto = new EquipQrDateDto();
equipQrDateDto.setContrast("updateDate");
equipQrDateDto.setSource("car");
equipQrDateDto.setData(list);
try {
ruleTrigger.publish(equipQrDateDto, "中心配置赋码规则/update-qr-code", null);
} catch (Exception e) {
log.error("调用规则失败: {}", e.getMessage());
}
}
}
//package com.yeejoin.equipmanage.quartz;
//
//import com.yeejoin.amos.component.rule.RuleTrigger;
//import com.yeejoin.equipmanage.common.entity.dto.EquipQrDateDto;
//import com.yeejoin.equipmanage.service.ICarService;
//import com.yeejoin.equipmanage.service.impl.CarServiceImpl;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.scheduling.annotation.EnableScheduling;
//import org.springframework.scheduling.annotation.Scheduled;
//import org.springframework.stereotype.Component;
//
//import java.util.List;
//import java.util.Map;
//
///**
// * 定时监控车辆相关指标
// * @author xxz
// */
//@Component
//@EnableScheduling
//@Slf4j
//public class CarPropertyJob {
//
// @Autowired
// private ICarService carService;
//
// @Autowired
// private RuleTrigger ruleTrigger;
//
// /**
// * 车辆赋红码。定时查询车辆启停更新时间,
// */
// @Scheduled(cron = "${update.car.qrCode}")
// public void UpdateCarQrCode() {
//
// List<Map<String, String>> list = carService.updateCarStartStatus();
// EquipQrDateDto equipQrDateDto = new EquipQrDateDto();
// equipQrDateDto.setContrast("updateDate");
// equipQrDateDto.setSource("car");
// equipQrDateDto.setData(list);
// try {
// ruleTrigger.publish(equipQrDateDto, "中心配置赋码规则/update-qr-code", null);
// } catch (Exception e) {
// log.error("调用规则失败: {}", e.getMessage());
// }
// }
//
//}
......@@ -265,6 +265,7 @@ public class RemoteSecurityService {
FeignClientResult feignClientResult = new FeignClientResult();
RequestContext.setProduct(productApp);
RequestContext.setAppKey(appKeyApp);
System.out.println("================================equip");
feignClientResult = Privilege.authClient.idpassword(dPasswordAuthModel);
Map map = (Map) feignClientResult.getResult();
if (map != null) {
......
......@@ -256,7 +256,7 @@ public class ConfirmAlarmServiceImpl extends ServiceImpl<ConfirmAlarmMapper, Equ
set(EquipmentSpecific::getRealtimeIotIndexUpdateDate,new Date()).
eq(EquipmentSpecific::getId,alarm.getEquipmentSpecificId());
equipmentSpecificSerivce.update(wrapper);
MqttReceiveServiceImpl.upAlarmLogStatus(alarmLog.getIotCode(), alarmLog.getEquipmentSpecificIndexKey(), null, equipmentSpecificAlarmLogService, true, "");
// MqttReceiveServiceImpl.upAlarmLogStatus(alarmLog.getIotCode(), alarmLog.getEquipmentSpecificIndexKey(), null, equipmentSpecificAlarmLogService, true, "");
}
// 如果是批量确警,先查询,再确警,用于批量消息推送
isBatch = ent.getIsBatch();
......
......@@ -129,7 +129,7 @@ public class ESeqServiceImpl implements IESeqService {
}
@Override
@Scheduled(cron = "${es.ESEquiplistSpecific.time}")
// @Scheduled(cron = "${es.ESEquiplistSpecific.time}")
public void deleteESEquiplistSpecificBySystemES() {
if(flag){
Calendar calendar = Calendar.getInstance();
......
......@@ -626,12 +626,12 @@ public class EmergencyServiceImpl implements IEmergencyService {
return emergencyMapper.alarmList(page, bizOrgCode, systemCode, types, emergencyLevels, name, cleanStatus, handleStatus);
}
//稳压泵定时向缓存中存昨日启动次数任务
@Scheduled(cron = "${equipment.pressurepump.start.cron}")
private void stationDoubleReport() {
System.out.println("开始定时存储昨日稳压泵启动次数,定时时间为:" + pumpYesterdayStart);
getPressurePumpDay();
}
// //稳压泵定时向缓存中存昨日启动次数任务
// @Scheduled(cron = "${equipment.pressurepump.start.cron}")
// private void stationDoubleReport() {
// System.out.println("开始定时存储昨日稳压泵启动次数,定时时间为:" + pumpYesterdayStart);
// getPressurePumpDay();
// }
@Override
public Map<String, List<PressurePumpCountVo>> getPressurePumpDay() {
......
......@@ -1942,77 +1942,77 @@ public class EquipmentSpecificSerivceImpl extends ServiceImpl<EquipmentSpecificM
return infoVoList;
}
/**
* 在设备报废前30日 每日9点执行 系统推送提醒。设备报废后停止消息推送提醒。。
*
* @throws Exception
*/
@Scheduled(cron = "${equipment.scrap.cron}")
@Transactional(rollbackFor = Exception.class)
public void equipmentScrap() throws Exception {
List<Map<String, Object>> equipSpecificScrap = equipmentSpecificIndexMapper.getEquipSpecificScrap();
equipSpecificScrap.forEach(e -> {
try {
if (e.get("weExpiry") != null) {
int year = Integer.parseInt(e.get("weExpiry").toString());
Date productDate = DateUtils.dateParse(e.get("product").toString().substring(0,10), DateUtils.DATE_PATTERN);
Calendar calendar = Calendar.getInstance();
calendar.setTime(productDate);
calendar.add(Calendar.YEAR, year);
Date now = new Date();
String scrapTime = new SimpleDateFormat(DateUtils.DATE_TIME_PATTERN).format(calendar.getTime());
int day = DateUtils.dateBetween(now, calendar.getTime());
if (day < Integer.parseInt(equipmentScrapDay) && day > -1) {
syncSystemctlMsg(e, scrapTime, day);
} else if (day == -1) {
// 发送emq消息 给idx
Map<String, String> map = new HashMap<>();
map.put("id", e.get("id").toString());
map.put("code", e.get("code").toString());
map.put("bizOrgCode", e.get("bizOrgCode").toString());
map.put("bizOrgName", e.get("bizOrgName").toString());
// /**
// * 在设备报废前30日 每日9点执行 系统推送提醒。设备报废后停止消息推送提醒。。
// *
// * @throws Exception
// */
// @Scheduled(cron = "${equipment.scrap.cron}")
// @Transactional(rollbackFor = Exception.class)
// public void equipmentScrap() throws Exception {
// List<Map<String, Object>> equipSpecificScrap = equipmentSpecificIndexMapper.getEquipSpecificScrap();
// equipSpecificScrap.forEach(e -> {
// try {
// if (e.get("weExpiry") != null) {
// int year = Integer.parseInt(e.get("weExpiry").toString());
// Date productDate = DateUtils.dateParse(e.get("product").toString().substring(0,10), DateUtils.DATE_PATTERN);
// Calendar calendar = Calendar.getInstance();
// calendar.setTime(productDate);
// calendar.add(Calendar.YEAR, year);
//
try {
emqKeeper.getMqttClient().publish("equip/scrap/put", JSONObject.toJSONString(map).getBytes(), 1, false);
} catch (MqttException exp) {
log.info(String.format("发送eqm转kafka消息失败:%s", exp.getMessage()));
}
// idxFeign.handleEquipNotScrapWhenExpired(String.valueOf(e.get("id")), String.valueOf(e.get("bizOrgCode")), String.valueOf(e.get("bizOrgName")));
}
}
} catch (ParseException parseException) {
parseException.printStackTrace();
}
});
}
/**
* 每日计算当天巡检任务数量,任务是否至少有一次执行。。
*
* @throws Exception
*/
@Scheduled(cron = "${equip.patrol.cron:0 0 9 * * ?}")
@Transactional(rollbackFor = Exception.class)
public void patrolTaskStatic() throws Exception {
Map<String, Object> mapPatrol = equipmentSpecificIndexMapper.patrolTaskStatic();
Map<String, String> map = new HashMap<>();
map.put("isFinishOnce", mapPatrol.get("isFinishOnce").toString());
map.put("taskNum", mapPatrol.get("taskNum").toString());
map.put("planNum", mapPatrol.get("planNum").toString());
map.put("bizOrgCode", mapPatrol.get("bizOrgCode").toString());
map.put("bizOrgName", mapPatrol.get("bizOrgName").toString());
try {
log.info("每日统计当天巡检任务情况:{}===========", map);
emqKeeper.getMqttClient().publish("equip/patrol/put", JSONObject.toJSONString(map).getBytes(), 1, false);
} catch (MqttException exp) {
log.info(String.format("发送eqm转kafka消息失败:%s", exp.getMessage()));
}
}
// Date now = new Date();
// String scrapTime = new SimpleDateFormat(DateUtils.DATE_TIME_PATTERN).format(calendar.getTime());
//
// int day = DateUtils.dateBetween(now, calendar.getTime());
// if (day < Integer.parseInt(equipmentScrapDay) && day > -1) {
// syncSystemctlMsg(e, scrapTime, day);
// } else if (day == -1) {
// // 发送emq消息 给idx
// Map<String, String> map = new HashMap<>();
// map.put("id", e.get("id").toString());
// map.put("code", e.get("code").toString());
// map.put("bizOrgCode", e.get("bizOrgCode").toString());
// map.put("bizOrgName", e.get("bizOrgName").toString());
////
// try {
// emqKeeper.getMqttClient().publish("equip/scrap/put", JSONObject.toJSONString(map).getBytes(), 1, false);
// } catch (MqttException exp) {
// log.info(String.format("发送eqm转kafka消息失败:%s", exp.getMessage()));
// }
//// idxFeign.handleEquipNotScrapWhenExpired(String.valueOf(e.get("id")), String.valueOf(e.get("bizOrgCode")), String.valueOf(e.get("bizOrgName")));
// }
// }
//
// } catch (ParseException parseException) {
// parseException.printStackTrace();
// }
// });
//
// }
// /**
// * 每日计算当天巡检任务数量,任务是否至少有一次执行。。
// *
// * @throws Exception
// */
// @Scheduled(cron = "${equip.patrol.cron:0 0 9 * * ?}")
// @Transactional(rollbackFor = Exception.class)
// public void patrolTaskStatic() throws Exception {
// Map<String, Object> mapPatrol = equipmentSpecificIndexMapper.patrolTaskStatic();
// Map<String, String> map = new HashMap<>();
// map.put("isFinishOnce", mapPatrol.get("isFinishOnce").toString());
// map.put("taskNum", mapPatrol.get("taskNum").toString());
// map.put("planNum", mapPatrol.get("planNum").toString());
// map.put("bizOrgCode", mapPatrol.get("bizOrgCode").toString());
// map.put("bizOrgName", mapPatrol.get("bizOrgName").toString());
// try {
// log.info("每日统计当天巡检任务情况:{}===========", map);
// emqKeeper.getMqttClient().publish("equip/patrol/put", JSONObject.toJSONString(map).getBytes(), 1, false);
// } catch (MqttException exp) {
// log.info(String.format("发送eqm转kafka消息失败:%s", exp.getMessage()));
// }
// }
void syncSystemctlMsg(Map<String, Object> map, String scrapTime, int i) {
try {
......
......@@ -76,47 +76,52 @@ public class JxiopCarIotListerServiceImpl {
return;
}
cache.put(iotCode,iotCode);
JSONObject jsonObject = JSONObject.parseObject(message.toString());
//判断是否有效坐标
if (!ObjectUtils.isEmpty(jsonObject.get("FireCar_Longitude")) && !ObjectUtils.isEmpty(jsonObject.get("FireCar_Latitude"))) {
//判断是否存在未结束进程,如果不存在,则进入判断插入开始节点
if (iWlCarMileageService.getUncompleteMileagByIotCode(iotCode)) {
WlCarMileage wlCarMileage = new WlCarMileage();
wlCarMileage.setIotCode(iotCode);
wlCarMileage.setDate(new Date());
// 获取开始坐标
double startLongitude = jsonObject.getDoubleValue("FireCar_Longitude");
double startLatitude = jsonObject.getDoubleValue("FireCar_Latitude");
// String currentTime = "20"+jsonObject.getString("currentTime");
wlCarMileage.setStartLongitude(startLongitude);
wlCarMileage.setStartLatitude(startLatitude);
// Date startTime = UTCToCST();
//时间值被mysql自动转换
Date startTime = new Date();
wlCarMileage.setStartTime(startTime);
wlCarMileage.setStartName(getAddress(startLongitude, startLatitude));
wlCarMileage.setStartSpeed(Double.valueOf(jsonObject.getDoubleValue("FireCar_Speed")).intValue());
logger.info("新增数据信息如下::" + JSONObject.toJSONString(wlCarMileage));
try {
iWlCarMileageService.save(wlCarMileage);
} catch (Exception e) {
e.printStackTrace();
iWlCarMileageService.save(wlCarMileage);
try{
JSONObject jsonObject = JSONObject.parseObject(message.toString());
//判断是否有效坐标
if (!ObjectUtils.isEmpty(jsonObject.get("FireCar_Longitude")) && !ObjectUtils.isEmpty(jsonObject.get("FireCar_Latitude"))) {
//判断是否存在未结束进程,如果不存在,则进入判断插入开始节点
if (iWlCarMileageService.getUncompleteMileagByIotCode(iotCode)) {
WlCarMileage wlCarMileage = new WlCarMileage();
wlCarMileage.setIotCode(iotCode);
wlCarMileage.setDate(new Date());
// 获取开始坐标
double startLongitude = jsonObject.getDoubleValue("FireCar_Longitude");
double startLatitude = jsonObject.getDoubleValue("FireCar_Latitude");
// String currentTime = "20"+jsonObject.getString("currentTime");
wlCarMileage.setStartLongitude(startLongitude);
wlCarMileage.setStartLatitude(startLatitude);
// Date startTime = UTCToCST();
//时间值被mysql自动转换
Date startTime = new Date();
wlCarMileage.setStartTime(startTime);
wlCarMileage.setStartName(getAddress(startLongitude, startLatitude));
wlCarMileage.setStartSpeed(Double.valueOf(jsonObject.getDoubleValue("FireCar_Speed")).intValue());
logger.info("新增数据信息如下::" + JSONObject.toJSONString(wlCarMileage));
try {
iWlCarMileageService.save(wlCarMileage);
} catch (Exception e) {
e.printStackTrace();
iWlCarMileageService.save(wlCarMileage);
}
}
}
this.updateCarLocation(jsonObject, iotCode);
String coordinate = jsonObject.getString("FireCar_Longitude") + "," + jsonObject.getString("FireCar_Latitude");
if (ObjectUtils.isEmpty(redisTemplate.opsForValue().get(iotCode))) {
redisTemplate.opsForValue().set(iotCode, coordinate, 5, TimeUnit.MINUTES);
//logger.info("首次入库redis:{}", iotCode);
} else {
if (!String.valueOf(redisTemplate.opsForValue().get(iotCode)).equals(coordinate)) {
// logger.info("更新入库redis:{},旧:{},新:{}", iotCode, redisTemplate.opsForValue().get(iotCode), coordinate);
this.updateCarLocation(jsonObject, iotCode);
String coordinate = jsonObject.getString("FireCar_Longitude") + "," + jsonObject.getString("FireCar_Latitude");
if (ObjectUtils.isEmpty(redisTemplate.opsForValue().get(iotCode))) {
redisTemplate.opsForValue().set(iotCode, coordinate, 5, TimeUnit.MINUTES);
//logger.info("首次入库redis:{}", iotCode);
} else {
if (!String.valueOf(redisTemplate.opsForValue().get(iotCode)).equals(coordinate)) {
// logger.info("更新入库redis:{},旧:{},新:{}", iotCode, redisTemplate.opsForValue().get(iotCode), coordinate);
redisTemplate.opsForValue().set(iotCode, coordinate, 5, TimeUnit.MINUTES);
}
}
}
}
cache.remove(iotCode);
finally
{
cache.remove(iotCode);
}
}
public String getAddress(double longitude, double lantitude) {
......
......@@ -435,545 +435,545 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
@Async("equipAsyncExecutor")
@Transactional(rollbackFor = Exception.class)
public void handlerMqttRomaMessage(String topic, String message) {
log.info("接收到换流站Kafka消息: {}", message);
TopicEntityVo topicEntity = new TopicEntityVo();
topicEntity.setTopic(topic);
topicEntity.setMessage(message);
List<IotDataVO> iotDatalist = new ArrayList<>();
List<EquipmentSpecificIndex> equipmentSpecificIndexList = new ArrayList<>();
List<EquipmentSpecificAlarm> equipmentSpecificAlarms = new ArrayList<>();
List<IndexStateVo> indexStateList = new ArrayList<>();
JSONObject jsonObject = JSONObject.parseObject(message);
StationMessage stationMessage = JSON.parseObject(String.valueOf(message), StationMessage.class);
String indexAddress = null, value = null, timeStamp = null, quality = null, dataType = null;
if (stationMessage != null) {
dataType = stationMessage.getDataType();
timeStamp = stationMessage.getTimeStamp();
quality = stationMessage.getQuality();
if (dataType.equals(STATE)) {
indexAddress = stationMessage.getScadaId();
value = ONE_1.equalsIgnoreCase(stationMessage.getValue()) ? TRUE : FALSE;
} else if (dataType.equals(DIS_CREATE)) {
indexAddress = stationMessage.getKey();
value = ONE_1_0.equalsIgnoreCase(stationMessage.getValue()) ? TRUE : FALSE;
} else {
indexAddress = stationMessage.getKey();
value = stationMessage.getValue();
}
}
Map<Object, Object> equipmentIndexKeyMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS_KEY);
if (equipmentIndexKeyMap.get(indexAddress) != null) {
EquipmentSpecificIndex equipmentSpeIndex = equipmentSpecificIndexService.getEquipmentSpeIndexByAddress(indexAddress, null, null);
equipmentSpeIndex.setValue(value);
equipmentSpeIndex.setValueLabel(valueTranslate(value, equipmentSpeIndex.getValueEnum()));
equipmentSpeIndex.setEquipmentType(topicEntity.getType());
equipmentSpeIndex.setUpdateDate(new Date());
equipmentSpeIndex.setQuality(quality);
equipmentSpeIndex.setDataType(dataType);
equipmentSpeIndex.setTimeStamp(timeStamp);
equipmentSpeIndex.setUUID(UUIDUtils.getUUID());
IotDataVO iotDataVO = new IotDataVO();
iotDataVO.setKey(equipmentSpeIndex.getNameKey());
iotDataVO.setValue(value);
iotDatalist.add(iotDataVO);
QueryWrapper<EquipmentSpecific> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("id", equipmentSpeIndex.getEquipmentSpecificId());
EquipmentSpecific equipmentSpecific = iEquipmentSpecificSerivce.getOne(queryWrapper);
if (equipmentSpecific == null) {
return;
}
String iotCode = equipmentSpecific.getIotCode();
StringBuilder endIndex = new StringBuilder(iotCode).insert(8, '/');
String iotTopic = "influxdb/" + endIndex;
JSONObject msg = new JSONObject();
msg.put(equipmentSpeIndex.getEquipmentIndexKey(), value);
mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg));
List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode);
if (eqIotCodeList.isEmpty()) {
log.info("该数据{}不存在!", iotCode);
return;
}
if (eqIotCodeList.size() > 1) {
log.info("有重复的{}数据!", iotCode);
}
EquipmentSpecificVo equipmentSpecificVo = eqIotCodeList.get(0);
topicEntity.setType(equipmentSpecificVo.getType());
topicEntity.setCode(equipmentSpecificVo.getCode());
//es存储数据
eSeqService.saveESEquiplistSpecificBySystemESVO(equipmentSpeIndex, String.valueOf(equipmentSpecificVo.getSystemId()), equipmentSpecificVo.getSystemName());
//更新装备性能指标
equipmentSpecificIndexService.updateById(equipmentSpeIndex);
// 更新设备表指标状态
iEquipmentSpecificSerivce.updateEquipmentSpecIndexRealtimeData(equipmentSpeIndex);
equipmentSpecificIndexList.add(equipmentSpeIndex);
indexStateList.add(createIndexStateVo(equipmentSpeIndex));
// 添加指标报告
saveEquipmentAlarmReportDay(equipmentSpeIndex);
// 火眼数据构造告警指标逻辑
equipmentSpeIndex = handleTemperatureAlarm(equipmentSpeIndex, iotDatalist);
boolean alarmFlag = false;
Map<String, String> messageBodyMap = new HashMap<>();
//管网压力、泡沫罐信息、水箱液位告警处理
if (iotDataVO.getKey().equalsIgnoreCase(CAFS_FoamTank_FoamTankLevel) ||
FHS_PipePressureDetector_PipePressure.equalsIgnoreCase(iotDataVO.getKey()) ||
iotDataVO.getKey().equalsIgnoreCase(CAFS_WaterTank_WaterTankLevel)) {
alarmFlag = doFoamTankLevel(iotDataVO, equipmentSpeIndex, messageBodyMap);
}
//消防水池液位处理
if (iotDataVO.getKey().equalsIgnoreCase(FHS_FirePoolDevice_WaterLevel) ||
iotDataVO.getKey().equalsIgnoreCase(FHS_WirelessliquidDetector_WaterLevel)) {
alarmFlag = doWaterPoolLevel(iotDataVO, equipmentSpeIndex, messageBodyMap);
}
// 遥测数据生成告警事件、日志处理
if (iotDataVO.getKey().equalsIgnoreCase(CAFS_FoamTank_FoamTankLevel) ||
FHS_PipePressureDetector_PipePressure.equalsIgnoreCase(iotDataVO.getKey()) ||
iotDataVO.getKey().equalsIgnoreCase(CAFS_WaterTank_WaterTankLevel) ||
iotDataVO.getKey().equalsIgnoreCase(FHS_FirePoolDevice_WaterLevel) ||
iotDataVO.getKey().equalsIgnoreCase(FHS_WirelessliquidDetector_WaterLevel)) {
handlingAlarms(equipmentSpeIndex, alarmFlag);
}
// 指标告警处理
if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
equipmentSpecificAlarms.addAll(createIndexAlarmRecord(equipmentSpeIndex, messageBodyMap));
}
// 遥测遥信数据推送云端kafka
JSONObject jsonObjectXf = new JSONObject();
jsonObjectXf.put("data_class", "realdata");
if (equipmentSpeIndex.getIsTrend() == 1) {
jsonObjectXf.put("data_type", "analog");
} else {
jsonObjectXf.put("data_type", "state");
}
String date = DateUtils.date2LongStr(new Date());
jsonObjectXf.put("op_type", "subscribe_emergency");
JSONObject jsonObjectCondition = new JSONObject();
jsonObjectCondition.put("station_psr_id", stationCode);
jsonObjectCondition.put("station_name", stationName);
jsonObjectCondition.put("data_upload_time", date);
jsonObjectXf.put("condition", jsonObjectCondition);
JSONObject jsonObjectData = new JSONObject();
jsonObjectData.put("psrId", stationCode);
jsonObjectData.put("astId", equipmentSpeIndex.getSpecificCode());
jsonObjectData.put("equipType", equipmentSpeIndex.getEquipmentCode());
jsonObjectData.put("name", equipmentSpeIndex.getEquipmentSpecificName() + "-" + equipmentSpeIndex.getEquipmentSpecificIndexName());
if (value.equals("true")) {
jsonObjectData.put("value", "1");
} else if (value.equals("false")) {
jsonObjectData.put("value", "0");
} else {
jsonObjectData.put("value", value);
}
jsonObjectData.put("measurementType", null == equipmentSpeIndex.getEquipmentIndexKey() ? "" : equipmentSpeIndex.getEquipmentIndexKey());
jsonObjectData.put("dateTime", date);
jsonObjectData.put("quality", "0"); // 量测质量码:0 有效,1 无效
List<JSONObject> jsonObjects = Collections.singletonList(jsonObjectData);
jsonObjectXf.put("data", jsonObjects);
// 遥测
if (!isOpenTelemetering && equipmentSpeIndex.getIsTrend() == 1) {
} else {
try {
emqKeeper.getMqttClient().publish("emq.xf.created", jsonObjectXf.toString().getBytes(), 1, false);
log.info("遥测遥信数据推送云端kafka成功");
} catch (MqttException e) {
log.error("遥测遥信数据推送云端kafka失败=====>" + e.getMessage());
}
}
// 报警数据保存
List<EquipmentSpecificAlarmLog> alarmLogs = new ArrayList<>();
if (!ObjectUtils.isEmpty(equipmentSpecificAlarms)) {
equipmentSpecificAlarmService.saveOrUpdateBatch(equipmentSpecificAlarms);
}
// 需要在事务提交之后,否则事务隔离查询不出数据
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
equipmentSpecificAlarms.forEach(action -> {
if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) {
alarmLogs.add(addEquipAlarmLogRecord(action));
if (ValidationUtil.isEmpty(action.getAlamContent())) {
action.setAlamContent(action.getEquipmentSpecificName() + action.getEquipmentSpecificIndexName());
}
mqttSendGateway.sendToMqtt(TopicEnum.EQDQR.getTopic(), JSONArray.toJSON(action).toString());
bool = Boolean.FALSE;
} else {
alarmLogs.addAll(upAlarmLogStatus(action.getIotCode(), action.getEquipmentSpecificIndexKey(), action.getTraceId(),
equipmentSpecificAlarmLogService, false, ""));
mqttSendGateway.sendToMqtt(TopicEnum.EQYQR.getTopic(), JSONArray.toJSON(action).toString());
bool = Boolean.TRUE;
}
});
// // 告警消息推送
if (!ObjectUtils.isEmpty(equipmentSpecificAlarms)) {
publishDataToDCCenterPage(equipmentSpecificIndexList);
}
//推送数据到组态大屏(消防系统)
pushDataToIntegrationPage(equipmentSpecificIndexList);
// 四横八纵遥测信号信息列表刷新
publishNormalIndexValueToPage(equipmentSpecificIndexList);
if ("zd".equals(system)) {
System.out.println("站端系统----------------");
// 向预控系统发送消息
sendEquipSpecIndexToAutosysTopic(equipmentSpecificIndexList);
// 首页性能指标数据订阅
mqttSendGateway.sendToMqtt(indexTopic, JSON.toJSONString(indexStateList));
// 组态大屏消息推送,设备表实时指标修改
intePageSysDataRefresh(equipmentSpecificIndexList, topicEntity);
// 数字换流站同步指标修改
syncSpecificIndexsToGS(equipmentSpecificIndexList);
// 则更新拓扑节点数据及告警状态
updateNodeDateByEquipId(equipmentSpecificIndexList);
// 向画布推送
publishDataToCanvas(equipmentSpecificIndexList);
// 向其他系统推送报警
try {
equipmentAlarmLogsToOtherSystems(alarmLogs);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (equipmentSpecificVo.getEcode() != null) {
String ecode = equipmentSpecificVo.getEcode();
boolean flag = false;
//消防泵
String[] strings = pumpCodes.split(",");
for (String string : strings) {
if (ecode.startsWith(string)) {
//通知>消防应急预案
topicEntity.setType("xfb");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
flag = true;
break;
}
}
// 消防炮
String[] stringxfp = monitorCodes.split(",");
if (!flag) {
for (String string1 : stringxfp) {
if (ecode.startsWith(string1)) {
//通知>消防应急预案
topicEntity.setType("xfp");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
flag = true;
break;
}
}
}
//消防水源
if (!flag) {
List<Map> lit = iEquipmentSpecificSerivce.getWater(equipmentSpecificVo.getId());
if (lit != null && !lit.isEmpty()) {
topicEntity.setType("xfsy");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
}
}
}
}
}
});
}
// log.info("接收到换流站Kafka消息: {}", message);
// TopicEntityVo topicEntity = new TopicEntityVo();
// topicEntity.setTopic(topic);
// topicEntity.setMessage(message);
//
// List<IotDataVO> iotDatalist = new ArrayList<>();
// List<EquipmentSpecificIndex> equipmentSpecificIndexList = new ArrayList<>();
// List<EquipmentSpecificAlarm> equipmentSpecificAlarms = new ArrayList<>();
// List<IndexStateVo> indexStateList = new ArrayList<>();
// JSONObject jsonObject = JSONObject.parseObject(message);
//
// StationMessage stationMessage = JSON.parseObject(String.valueOf(message), StationMessage.class);
// String indexAddress = null, value = null, timeStamp = null, quality = null, dataType = null;
// if (stationMessage != null) {
// dataType = stationMessage.getDataType();
// timeStamp = stationMessage.getTimeStamp();
// quality = stationMessage.getQuality();
//
// if (dataType.equals(STATE)) {
// indexAddress = stationMessage.getScadaId();
// value = ONE_1.equalsIgnoreCase(stationMessage.getValue()) ? TRUE : FALSE;
// } else if (dataType.equals(DIS_CREATE)) {
// indexAddress = stationMessage.getKey();
// value = ONE_1_0.equalsIgnoreCase(stationMessage.getValue()) ? TRUE : FALSE;
// } else {
// indexAddress = stationMessage.getKey();
// value = stationMessage.getValue();
// }
// }
//
// Map<Object, Object> equipmentIndexKeyMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS_KEY);
// if (equipmentIndexKeyMap.get(indexAddress) != null) {
// EquipmentSpecificIndex equipmentSpeIndex = equipmentSpecificIndexService.getEquipmentSpeIndexByAddress(indexAddress, null, null);
//
// equipmentSpeIndex.setValue(value);
// equipmentSpeIndex.setValueLabel(valueTranslate(value, equipmentSpeIndex.getValueEnum()));
// equipmentSpeIndex.setEquipmentType(topicEntity.getType());
// equipmentSpeIndex.setUpdateDate(new Date());
// equipmentSpeIndex.setQuality(quality);
// equipmentSpeIndex.setDataType(dataType);
// equipmentSpeIndex.setTimeStamp(timeStamp);
// equipmentSpeIndex.setUUID(UUIDUtils.getUUID());
//
// IotDataVO iotDataVO = new IotDataVO();
// iotDataVO.setKey(equipmentSpeIndex.getNameKey());
// iotDataVO.setValue(value);
// iotDatalist.add(iotDataVO);
//
// QueryWrapper<EquipmentSpecific> queryWrapper = new QueryWrapper<>();
// queryWrapper.eq("id", equipmentSpeIndex.getEquipmentSpecificId());
// EquipmentSpecific equipmentSpecific = iEquipmentSpecificSerivce.getOne(queryWrapper);
// if (equipmentSpecific == null) {
// return;
// }
// String iotCode = equipmentSpecific.getIotCode();
// StringBuilder endIndex = new StringBuilder(iotCode).insert(8, '/');
// String iotTopic = "influxdb/" + endIndex;
// JSONObject msg = new JSONObject();
// msg.put(equipmentSpeIndex.getEquipmentIndexKey(), value);
// mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg));
//
// List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode);
//
// if (eqIotCodeList.isEmpty()) {
// log.info("该数据{}不存在!", iotCode);
// return;
// }
// if (eqIotCodeList.size() > 1) {
// log.info("有重复的{}数据!", iotCode);
// }
// EquipmentSpecificVo equipmentSpecificVo = eqIotCodeList.get(0);
// topicEntity.setType(equipmentSpecificVo.getType());
// topicEntity.setCode(equipmentSpecificVo.getCode());
//
// //es存储数据
// eSeqService.saveESEquiplistSpecificBySystemESVO(equipmentSpeIndex, String.valueOf(equipmentSpecificVo.getSystemId()), equipmentSpecificVo.getSystemName());
// //更新装备性能指标
// equipmentSpecificIndexService.updateById(equipmentSpeIndex);
//
// // 更新设备表指标状态
// iEquipmentSpecificSerivce.updateEquipmentSpecIndexRealtimeData(equipmentSpeIndex);
//
// equipmentSpecificIndexList.add(equipmentSpeIndex);
//
// indexStateList.add(createIndexStateVo(equipmentSpeIndex));
//
// // 添加指标报告
// saveEquipmentAlarmReportDay(equipmentSpeIndex);
//
// // 火眼数据构造告警指标逻辑
// equipmentSpeIndex = handleTemperatureAlarm(equipmentSpeIndex, iotDatalist);
//
// boolean alarmFlag = false;
// Map<String, String> messageBodyMap = new HashMap<>();
// //管网压力、泡沫罐信息、水箱液位告警处理
// if (iotDataVO.getKey().equalsIgnoreCase(CAFS_FoamTank_FoamTankLevel) ||
// FHS_PipePressureDetector_PipePressure.equalsIgnoreCase(iotDataVO.getKey()) ||
// iotDataVO.getKey().equalsIgnoreCase(CAFS_WaterTank_WaterTankLevel)) {
// alarmFlag = doFoamTankLevel(iotDataVO, equipmentSpeIndex, messageBodyMap);
// }
// //消防水池液位处理
// if (iotDataVO.getKey().equalsIgnoreCase(FHS_FirePoolDevice_WaterLevel) ||
// iotDataVO.getKey().equalsIgnoreCase(FHS_WirelessliquidDetector_WaterLevel)) {
// alarmFlag = doWaterPoolLevel(iotDataVO, equipmentSpeIndex, messageBodyMap);
// }
// // 遥测数据生成告警事件、日志处理
// if (iotDataVO.getKey().equalsIgnoreCase(CAFS_FoamTank_FoamTankLevel) ||
// FHS_PipePressureDetector_PipePressure.equalsIgnoreCase(iotDataVO.getKey()) ||
// iotDataVO.getKey().equalsIgnoreCase(CAFS_WaterTank_WaterTankLevel) ||
// iotDataVO.getKey().equalsIgnoreCase(FHS_FirePoolDevice_WaterLevel) ||
// iotDataVO.getKey().equalsIgnoreCase(FHS_WirelessliquidDetector_WaterLevel)) {
// handlingAlarms(equipmentSpeIndex, alarmFlag);
// }
//
// // 指标告警处理
// if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
// equipmentSpecificAlarms.addAll(createIndexAlarmRecord(equipmentSpeIndex, messageBodyMap));
// }
// // 遥测遥信数据推送云端kafka
// JSONObject jsonObjectXf = new JSONObject();
// jsonObjectXf.put("data_class", "realdata");
//
// if (equipmentSpeIndex.getIsTrend() == 1) {
// jsonObjectXf.put("data_type", "analog");
// } else {
// jsonObjectXf.put("data_type", "state");
// }
//
// String date = DateUtils.date2LongStr(new Date());
// jsonObjectXf.put("op_type", "subscribe_emergency");
// JSONObject jsonObjectCondition = new JSONObject();
// jsonObjectCondition.put("station_psr_id", stationCode);
// jsonObjectCondition.put("station_name", stationName);
// jsonObjectCondition.put("data_upload_time", date);
// jsonObjectXf.put("condition", jsonObjectCondition);
//
// JSONObject jsonObjectData = new JSONObject();
// jsonObjectData.put("psrId", stationCode);
// jsonObjectData.put("astId", equipmentSpeIndex.getSpecificCode());
// jsonObjectData.put("equipType", equipmentSpeIndex.getEquipmentCode());
// jsonObjectData.put("name", equipmentSpeIndex.getEquipmentSpecificName() + "-" + equipmentSpeIndex.getEquipmentSpecificIndexName());
// if (value.equals("true")) {
// jsonObjectData.put("value", "1");
// } else if (value.equals("false")) {
// jsonObjectData.put("value", "0");
// } else {
// jsonObjectData.put("value", value);
// }
// jsonObjectData.put("measurementType", null == equipmentSpeIndex.getEquipmentIndexKey() ? "" : equipmentSpeIndex.getEquipmentIndexKey());
// jsonObjectData.put("dateTime", date);
// jsonObjectData.put("quality", "0"); // 量测质量码:0 有效,1 无效
// List<JSONObject> jsonObjects = Collections.singletonList(jsonObjectData);
//
// jsonObjectXf.put("data", jsonObjects);
//
// // 遥测
// if (!isOpenTelemetering && equipmentSpeIndex.getIsTrend() == 1) {
//
// } else {
// try {
// emqKeeper.getMqttClient().publish("emq.xf.created", jsonObjectXf.toString().getBytes(), 1, false);
// log.info("遥测遥信数据推送云端kafka成功");
// } catch (MqttException e) {
// log.error("遥测遥信数据推送云端kafka失败=====>" + e.getMessage());
// }
// }
//
// // 报警数据保存
// List<EquipmentSpecificAlarmLog> alarmLogs = new ArrayList<>();
// if (!ObjectUtils.isEmpty(equipmentSpecificAlarms)) {
// equipmentSpecificAlarmService.saveOrUpdateBatch(equipmentSpecificAlarms);
// }
//
// // 需要在事务提交之后,否则事务隔离查询不出数据
// TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
// @Override
// public void afterCommit() {
// equipmentSpecificAlarms.forEach(action -> {
// if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) {
// alarmLogs.add(addEquipAlarmLogRecord(action));
// if (ValidationUtil.isEmpty(action.getAlamContent())) {
// action.setAlamContent(action.getEquipmentSpecificName() + action.getEquipmentSpecificIndexName());
// }
// mqttSendGateway.sendToMqtt(TopicEnum.EQDQR.getTopic(), JSONArray.toJSON(action).toString());
// bool = Boolean.FALSE;
// } else {
// alarmLogs.addAll(upAlarmLogStatus(action.getIotCode(), action.getEquipmentSpecificIndexKey(), action.getTraceId(),
// equipmentSpecificAlarmLogService, false, ""));
// mqttSendGateway.sendToMqtt(TopicEnum.EQYQR.getTopic(), JSONArray.toJSON(action).toString());
// bool = Boolean.TRUE;
// }
// });
//
// // // 告警消息推送
// if (!ObjectUtils.isEmpty(equipmentSpecificAlarms)) {
// publishDataToDCCenterPage(equipmentSpecificIndexList);
// }
//
// //推送数据到组态大屏(消防系统)
// pushDataToIntegrationPage(equipmentSpecificIndexList);
// // 四横八纵遥测信号信息列表刷新
// publishNormalIndexValueToPage(equipmentSpecificIndexList);
// if ("zd".equals(system)) {
// System.out.println("站端系统----------------");
//
// // 向预控系统发送消息
// sendEquipSpecIndexToAutosysTopic(equipmentSpecificIndexList);
//
// // 首页性能指标数据订阅
// mqttSendGateway.sendToMqtt(indexTopic, JSON.toJSONString(indexStateList));
//
// // 组态大屏消息推送,设备表实时指标修改
// intePageSysDataRefresh(equipmentSpecificIndexList, topicEntity);
//
// // 数字换流站同步指标修改
// syncSpecificIndexsToGS(equipmentSpecificIndexList);
//
// // 则更新拓扑节点数据及告警状态
// updateNodeDateByEquipId(equipmentSpecificIndexList);
//
// // 向画布推送
// publishDataToCanvas(equipmentSpecificIndexList);
//
// // 向其他系统推送报警
// try {
// equipmentAlarmLogsToOtherSystems(alarmLogs);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
//
// if (equipmentSpecificVo.getEcode() != null) {
// String ecode = equipmentSpecificVo.getEcode();
// boolean flag = false;
//
// //消防泵
// String[] strings = pumpCodes.split(",");
// for (String string : strings) {
// if (ecode.startsWith(string)) {
// //通知>消防应急预案
// topicEntity.setType("xfb");
// mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
// flag = true;
// break;
// }
// }
//
// // 消防炮
// String[] stringxfp = monitorCodes.split(",");
// if (!flag) {
// for (String string1 : stringxfp) {
// if (ecode.startsWith(string1)) {
// //通知>消防应急预案
// topicEntity.setType("xfp");
// mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
// flag = true;
// break;
// }
// }
// }
// //消防水源
// if (!flag) {
// List<Map> lit = iEquipmentSpecificSerivce.getWater(equipmentSpecificVo.getId());
// if (lit != null && !lit.isEmpty()) {
// topicEntity.setType("xfsy");
// mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
// }
// }
// }
// }
// }
// });
// }
}
@Override
@Transactional(rollbackFor = Exception.class)
public void handlerMqttStationMessage(String topic, String message) {
log.info("接收到韶山Kafka消息: {}", message);
TopicEntityVo topicEntity = new TopicEntityVo();
topicEntity.setTopic(topic);
topicEntity.setMessage(message);
List<IotDataVO> iotDatalist = new ArrayList<>();
List<EquipmentSpecificIndex> equipmentSpecificIndexList = new ArrayList<>();
List<EquipmentSpecificAlarm> equipmentSpecificAlarms = new ArrayList<>();
List<IndexStateVo> indexStateList = new ArrayList<>();
SShanStationMessage sShanStationMessage = JSON.parseObject(String.valueOf(message), SShanStationMessage.class);
Map<Object, Object> equipmentIndexKeyMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS_KEY_STATION);
sShanStationMessage.getWarns().forEach(m -> {
String value;
if (message.contains("eventtextL1")) {
// 此通道目前专供韶山告警消息value赋值,没必要用配置文件多字段进行逐个判断,所以暂时不修改
value = m.getEventTextL1().contains("出现") ? TRUE : FALSE;
} else {
value = String.valueOf(BeanUtil.judgePropertyContainsChar(m.getEventstatus(), specialChars));
}
if (equipmentIndexKeyMap.get(m.getPointId()) != null) {
EquipmentSpecificIndex equipmentSpeIndex = equipmentSpecificIndexService.getEquipmentSpeIndexByAddress(null, m.getPointId(), null);
if (equipmentSpeIndex == null) {
return;
}
equipmentSpeIndex.setValue(value);
equipmentSpeIndex.setValueLabel(valueTranslate(value, equipmentSpeIndex.getValueEnum()));
equipmentSpeIndex.setEquipmentType(topicEntity.getType());
equipmentSpeIndex.setUpdateDate(new Date());
equipmentSpeIndex.setTimeStamp(sShanStationMessage.getTimestamp());
equipmentSpeIndex.setUUID(UUIDUtils.getUUID());
IotDataVO iotDataVO = new IotDataVO();
iotDataVO.setKey(equipmentSpeIndex.getNameKey());
iotDataVO.setValue(value);
iotDatalist.add(iotDataVO);
QueryWrapper<EquipmentSpecific> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("id", equipmentSpeIndex.getEquipmentSpecificId());
EquipmentSpecific equipmentSpecific = iEquipmentSpecificSerivce.getOne(queryWrapper);
if (equipmentSpecific == null) {
return;
}
String iotCode = equipmentSpecific.getIotCode();
StringBuilder endIndex = new StringBuilder(iotCode).insert(8, '/');
String iotTopic = "influxdb/" + endIndex;
JSONObject msg = new JSONObject();
msg.put(equipmentSpeIndex.getEquipmentIndexKey(), value);
mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg));
List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode);
if (eqIotCodeList.isEmpty()) {
log.info("该数据{}不存在!", iotCode);
return;
}
if (eqIotCodeList.size() > 1) {
log.info("有重复的{}数据!", iotCode);
}
EquipmentSpecificVo equipmentSpecificVo = eqIotCodeList.get(0);
topicEntity.setType(equipmentSpecificVo.getType());
topicEntity.setCode(equipmentSpecificVo.getCode());
//es存储数据
eSeqService.saveESEquiplistSpecificBySystemESVO(equipmentSpeIndex, String.valueOf(equipmentSpecificVo.getSystemId()), equipmentSpecificVo.getSystemName());
//更新装备性能指标
equipmentSpecificIndexService.updateById(equipmentSpeIndex);
// 更新设备表指标状态
iEquipmentSpecificSerivce.updateEquipmentSpecIndexRealtimeData(equipmentSpeIndex);
equipmentSpecificIndexList.add(equipmentSpeIndex);
indexStateList.add(createIndexStateVo(equipmentSpeIndex));
// 添加指标报告
saveEquipmentAlarmReportDay(equipmentSpeIndex);
// 火眼数据构造告警指标逻辑
equipmentSpeIndex = handleTemperatureAlarm(equipmentSpeIndex, iotDatalist);
boolean alarmFlag = false;
Map<String, String> messageBodyMap = new HashMap<>();
//管网压力、泡沫罐信息、水箱液位告警处理
if (iotDataVO.getKey().equalsIgnoreCase(CAFS_FoamTank_FoamTankLevel) ||
FHS_PipePressureDetector_PipePressure.equalsIgnoreCase(iotDataVO.getKey()) ||
iotDataVO.getKey().equalsIgnoreCase(CAFS_WaterTank_WaterTankLevel)) {
alarmFlag = doFoamTankLevel(iotDataVO, equipmentSpeIndex, messageBodyMap);
}
//消防水池液位处理
if (iotDataVO.getKey().equalsIgnoreCase(FHS_FirePoolDevice_WaterLevel) ||
iotDataVO.getKey().equalsIgnoreCase(FHS_WirelessliquidDetector_WaterLevel)) {
alarmFlag = doWaterPoolLevel(iotDataVO, equipmentSpeIndex, messageBodyMap);
}
// 遥测数据生成告警事件、日志处理
if (iotDataVO.getKey().equalsIgnoreCase(CAFS_FoamTank_FoamTankLevel) ||
FHS_PipePressureDetector_PipePressure.equalsIgnoreCase(iotDataVO.getKey()) ||
iotDataVO.getKey().equalsIgnoreCase(CAFS_WaterTank_WaterTankLevel) ||
iotDataVO.getKey().equalsIgnoreCase(FHS_FirePoolDevice_WaterLevel) ||
iotDataVO.getKey().equalsIgnoreCase(FHS_WirelessliquidDetector_WaterLevel)) {
handlingAlarms(equipmentSpeIndex, alarmFlag);
}
// 指标告警处理
if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
equipmentSpecificAlarms.addAll(createIndexAlarmRecord(equipmentSpeIndex, messageBodyMap));
}
// 遥测遥信数据推送云端kafka
JSONObject jsonObjectXf = new JSONObject();
jsonObjectXf.put("data_class", "realdata");
if (equipmentSpeIndex.getIsTrend() == 1) {
jsonObjectXf.put("data_type", "analog");
} else {
jsonObjectXf.put("data_type", "state");
}
String date = DateUtils.date2LongStr(new Date());
jsonObjectXf.put("op_type", "subscribe_emergency");
JSONObject jsonObjectCondition = new JSONObject();
jsonObjectCondition.put("station_psr_id", stationCode);
jsonObjectCondition.put("station_name", stationName);
jsonObjectCondition.put("data_upload_time", date);
jsonObjectXf.put("condition", jsonObjectCondition);
JSONObject jsonObjectData = new JSONObject();
jsonObjectData.put("psrId", stationCode);
jsonObjectData.put("astId", equipmentSpeIndex.getSpecificCode());
jsonObjectData.put("equipType", equipmentSpeIndex.getEquipmentCode());
jsonObjectData.put("name", equipmentSpeIndex.getEquipmentSpecificName() + "-" + equipmentSpeIndex.getEquipmentSpecificIndexName());
if (value.equals("true")) {
jsonObjectData.put("value", "1");
} else {
jsonObjectData.put("value", "0");
}
jsonObjectData.put("measurementType", null == equipmentSpeIndex.getEquipmentIndexKey() ? "" : equipmentSpeIndex.getEquipmentIndexKey());
jsonObjectData.put("dateTime", date);
jsonObjectData.put("quality", "0"); // 量测质量码:0 有效,1 无效
List<JSONObject> jsonObjects = Collections.singletonList(jsonObjectData);
jsonObjectXf.put("data", jsonObjects);
// 遥测
if (!isOpenTelemetering && equipmentSpeIndex.getIsTrend() == 1) {
} else {
try {
emqKeeper.getMqttClient().publish("emq.xf.created", jsonObjectXf.toString().getBytes(), 1, false);
log.info("遥测遥信数据推送云端kafka成功");
} catch (MqttException e) {
log.error("遥测遥信数据推送云端kafka失败=====>" + e.getMessage());
e.printStackTrace();
}
}
// 报警数据保存
List<EquipmentSpecificAlarmLog> alarmLogs = new ArrayList<>();
if (!ObjectUtils.isEmpty(equipmentSpecificAlarms)) {
equipmentSpecificAlarmService.saveOrUpdateBatch(equipmentSpecificAlarms);
}
// 需要在事务提交之后,否则事务隔离查询不出数据
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
equipmentSpecificAlarms.forEach(action -> {
if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) {
alarmLogs.add(addEquipAlarmLogRecord(action));
if (ValidationUtil.isEmpty(action.getAlamContent())) {
action.setAlamContent(action.getEquipmentSpecificName() + action.getEquipmentSpecificIndexName());
}
mqttSendGateway.sendToMqtt(TopicEnum.EQDQR.getTopic(), JSONArray.toJSON(action).toString());
bool = Boolean.FALSE;
} else {
alarmLogs.addAll(upAlarmLogStatus(action.getIotCode(), action.getEquipmentSpecificIndexKey(), action.getTraceId(),
equipmentSpecificAlarmLogService, false, ""));
mqttSendGateway.sendToMqtt(TopicEnum.EQYQR.getTopic(), JSONArray.toJSON(action).toString());
bool = Boolean.TRUE;
}
});
// // 告警消息推送
if (!ObjectUtils.isEmpty(equipmentSpecificAlarms)) {
publishDataToDCCenterPage(equipmentSpecificIndexList);
}
//推送数据到组态大屏(消防系统)
pushDataToIntegrationPage(equipmentSpecificIndexList);
// 四横八纵遥测信号信息列表刷新
publishNormalIndexValueToPage(equipmentSpecificIndexList);
if ("zd".equals(system)) {
System.out.println("站端系统----------------");
// 向预控系统发送消息
sendEquipSpecIndexToAutosysTopic(equipmentSpecificIndexList);
// 首页性能指标数据订阅
mqttSendGateway.sendToMqtt(indexTopic, JSON.toJSONString(indexStateList));
// 组态大屏消息推送,设备表实时指标修改
intePageSysDataRefresh(equipmentSpecificIndexList, topicEntity);
// 数字换流站同步指标修改
syncSpecificIndexsToGS(equipmentSpecificIndexList);
// 则更新拓扑节点数据及告警状态
updateNodeDateByEquipId(equipmentSpecificIndexList);
// 向画布推送
publishDataToCanvas(equipmentSpecificIndexList);
// 向其他系统推送报警
try {
equipmentAlarmLogsToOtherSystems(alarmLogs);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (equipmentSpecificVo.getEcode() != null) {
String ecode = equipmentSpecificVo.getEcode();
boolean flag = false;
//消防泵
String[] strings = pumpCodes.split(",");
for (String string : strings) {
if (ecode.startsWith(string)) {
//通知>消防应急预案
topicEntity.setType("xfb");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
flag = true;
break;
}
}
// 消防炮
String[] stringxfp = monitorCodes.split(",");
if (!flag) {
for (String string1 : stringxfp) {
if (ecode.startsWith(string1)) {
//通知>消防应急预案
topicEntity.setType("xfp");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
flag = true;
break;
}
}
}
//消防水源
if (!flag) {
List<Map> lit = iEquipmentSpecificSerivce.getWater(equipmentSpecificVo.getId());
if (lit != null && !lit.isEmpty()) {
topicEntity.setType("xfsy");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
}
}
}
}
}
});
}
});
// log.info("接收到韶山Kafka消息: {}", message);
// TopicEntityVo topicEntity = new TopicEntityVo();
// topicEntity.setTopic(topic);
// topicEntity.setMessage(message);
//
// List<IotDataVO> iotDatalist = new ArrayList<>();
// List<EquipmentSpecificIndex> equipmentSpecificIndexList = new ArrayList<>();
// List<EquipmentSpecificAlarm> equipmentSpecificAlarms = new ArrayList<>();
// List<IndexStateVo> indexStateList = new ArrayList<>();
//
// SShanStationMessage sShanStationMessage = JSON.parseObject(String.valueOf(message), SShanStationMessage.class);
// Map<Object, Object> equipmentIndexKeyMap = redisUtils.hmget(RedisKey.EQUIP_INDEX_ADDRESS_KEY_STATION);
// sShanStationMessage.getWarns().forEach(m -> {
// String value;
// if (message.contains("eventtextL1")) {
// // 此通道目前专供韶山告警消息value赋值,没必要用配置文件多字段进行逐个判断,所以暂时不修改
// value = m.getEventTextL1().contains("出现") ? TRUE : FALSE;
// } else {
// value = String.valueOf(BeanUtil.judgePropertyContainsChar(m.getEventstatus(), specialChars));
// }
//
// if (equipmentIndexKeyMap.get(m.getPointId()) != null) {
// EquipmentSpecificIndex equipmentSpeIndex = equipmentSpecificIndexService.getEquipmentSpeIndexByAddress(null, m.getPointId(), null);
// if (equipmentSpeIndex == null) {
// return;
// }
// equipmentSpeIndex.setValue(value);
// equipmentSpeIndex.setValueLabel(valueTranslate(value, equipmentSpeIndex.getValueEnum()));
// equipmentSpeIndex.setEquipmentType(topicEntity.getType());
// equipmentSpeIndex.setUpdateDate(new Date());
// equipmentSpeIndex.setTimeStamp(sShanStationMessage.getTimestamp());
// equipmentSpeIndex.setUUID(UUIDUtils.getUUID());
//
// IotDataVO iotDataVO = new IotDataVO();
// iotDataVO.setKey(equipmentSpeIndex.getNameKey());
// iotDataVO.setValue(value);
// iotDatalist.add(iotDataVO);
//
// QueryWrapper<EquipmentSpecific> queryWrapper = new QueryWrapper<>();
// queryWrapper.eq("id", equipmentSpeIndex.getEquipmentSpecificId());
// EquipmentSpecific equipmentSpecific = iEquipmentSpecificSerivce.getOne(queryWrapper);
// if (equipmentSpecific == null) {
// return;
// }
// String iotCode = equipmentSpecific.getIotCode();
// StringBuilder endIndex = new StringBuilder(iotCode).insert(8, '/');
// String iotTopic = "influxdb/" + endIndex;
// JSONObject msg = new JSONObject();
// msg.put(equipmentSpeIndex.getEquipmentIndexKey(), value);
// mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg));
//
// List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode);
//
// if (eqIotCodeList.isEmpty()) {
// log.info("该数据{}不存在!", iotCode);
// return;
// }
// if (eqIotCodeList.size() > 1) {
// log.info("有重复的{}数据!", iotCode);
// }
// EquipmentSpecificVo equipmentSpecificVo = eqIotCodeList.get(0);
// topicEntity.setType(equipmentSpecificVo.getType());
// topicEntity.setCode(equipmentSpecificVo.getCode());
//
// //es存储数据
// eSeqService.saveESEquiplistSpecificBySystemESVO(equipmentSpeIndex, String.valueOf(equipmentSpecificVo.getSystemId()), equipmentSpecificVo.getSystemName());
// //更新装备性能指标
// equipmentSpecificIndexService.updateById(equipmentSpeIndex);
//
// // 更新设备表指标状态
// iEquipmentSpecificSerivce.updateEquipmentSpecIndexRealtimeData(equipmentSpeIndex);
//
// equipmentSpecificIndexList.add(equipmentSpeIndex);
//
// indexStateList.add(createIndexStateVo(equipmentSpeIndex));
//
// // 添加指标报告
// saveEquipmentAlarmReportDay(equipmentSpeIndex);
//
// // 火眼数据构造告警指标逻辑
// equipmentSpeIndex = handleTemperatureAlarm(equipmentSpeIndex, iotDatalist);
//
// boolean alarmFlag = false;
// Map<String, String> messageBodyMap = new HashMap<>();
// //管网压力、泡沫罐信息、水箱液位告警处理
// if (iotDataVO.getKey().equalsIgnoreCase(CAFS_FoamTank_FoamTankLevel) ||
// FHS_PipePressureDetector_PipePressure.equalsIgnoreCase(iotDataVO.getKey()) ||
// iotDataVO.getKey().equalsIgnoreCase(CAFS_WaterTank_WaterTankLevel)) {
// alarmFlag = doFoamTankLevel(iotDataVO, equipmentSpeIndex, messageBodyMap);
// }
// //消防水池液位处理
// if (iotDataVO.getKey().equalsIgnoreCase(FHS_FirePoolDevice_WaterLevel) ||
// iotDataVO.getKey().equalsIgnoreCase(FHS_WirelessliquidDetector_WaterLevel)) {
// alarmFlag = doWaterPoolLevel(iotDataVO, equipmentSpeIndex, messageBodyMap);
// }
// // 遥测数据生成告警事件、日志处理
// if (iotDataVO.getKey().equalsIgnoreCase(CAFS_FoamTank_FoamTankLevel) ||
// FHS_PipePressureDetector_PipePressure.equalsIgnoreCase(iotDataVO.getKey()) ||
// iotDataVO.getKey().equalsIgnoreCase(CAFS_WaterTank_WaterTankLevel) ||
// iotDataVO.getKey().equalsIgnoreCase(FHS_FirePoolDevice_WaterLevel) ||
// iotDataVO.getKey().equalsIgnoreCase(FHS_WirelessliquidDetector_WaterLevel)) {
// handlingAlarms(equipmentSpeIndex, alarmFlag);
// }
//
// // 指标告警处理
// if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
// equipmentSpecificAlarms.addAll(createIndexAlarmRecord(equipmentSpeIndex, messageBodyMap));
// }
// // 遥测遥信数据推送云端kafka
// JSONObject jsonObjectXf = new JSONObject();
// jsonObjectXf.put("data_class", "realdata");
//
// if (equipmentSpeIndex.getIsTrend() == 1) {
// jsonObjectXf.put("data_type", "analog");
// } else {
// jsonObjectXf.put("data_type", "state");
// }
//
// String date = DateUtils.date2LongStr(new Date());
// jsonObjectXf.put("op_type", "subscribe_emergency");
// JSONObject jsonObjectCondition = new JSONObject();
// jsonObjectCondition.put("station_psr_id", stationCode);
// jsonObjectCondition.put("station_name", stationName);
// jsonObjectCondition.put("data_upload_time", date);
// jsonObjectXf.put("condition", jsonObjectCondition);
//
// JSONObject jsonObjectData = new JSONObject();
// jsonObjectData.put("psrId", stationCode);
// jsonObjectData.put("astId", equipmentSpeIndex.getSpecificCode());
// jsonObjectData.put("equipType", equipmentSpeIndex.getEquipmentCode());
// jsonObjectData.put("name", equipmentSpeIndex.getEquipmentSpecificName() + "-" + equipmentSpeIndex.getEquipmentSpecificIndexName());
// if (value.equals("true")) {
// jsonObjectData.put("value", "1");
// } else {
// jsonObjectData.put("value", "0");
// }
// jsonObjectData.put("measurementType", null == equipmentSpeIndex.getEquipmentIndexKey() ? "" : equipmentSpeIndex.getEquipmentIndexKey());
// jsonObjectData.put("dateTime", date);
// jsonObjectData.put("quality", "0"); // 量测质量码:0 有效,1 无效
// List<JSONObject> jsonObjects = Collections.singletonList(jsonObjectData);
//
// jsonObjectXf.put("data", jsonObjects);
//
// // 遥测
// if (!isOpenTelemetering && equipmentSpeIndex.getIsTrend() == 1) {
//
// } else {
// try {
// emqKeeper.getMqttClient().publish("emq.xf.created", jsonObjectXf.toString().getBytes(), 1, false);
// log.info("遥测遥信数据推送云端kafka成功");
// } catch (MqttException e) {
// log.error("遥测遥信数据推送云端kafka失败=====>" + e.getMessage());
// e.printStackTrace();
// }
// }
//
// // 报警数据保存
// List<EquipmentSpecificAlarmLog> alarmLogs = new ArrayList<>();
// if (!ObjectUtils.isEmpty(equipmentSpecificAlarms)) {
// equipmentSpecificAlarmService.saveOrUpdateBatch(equipmentSpecificAlarms);
// }
//
// // 需要在事务提交之后,否则事务隔离查询不出数据
// TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
// @Override
// public void afterCommit() {
// equipmentSpecificAlarms.forEach(action -> {
// if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) {
// alarmLogs.add(addEquipAlarmLogRecord(action));
// if (ValidationUtil.isEmpty(action.getAlamContent())) {
// action.setAlamContent(action.getEquipmentSpecificName() + action.getEquipmentSpecificIndexName());
// }
// mqttSendGateway.sendToMqtt(TopicEnum.EQDQR.getTopic(), JSONArray.toJSON(action).toString());
// bool = Boolean.FALSE;
// } else {
// alarmLogs.addAll(upAlarmLogStatus(action.getIotCode(), action.getEquipmentSpecificIndexKey(), action.getTraceId(),
// equipmentSpecificAlarmLogService, false, ""));
// mqttSendGateway.sendToMqtt(TopicEnum.EQYQR.getTopic(), JSONArray.toJSON(action).toString());
// bool = Boolean.TRUE;
// }
// });
//
// // // 告警消息推送
// if (!ObjectUtils.isEmpty(equipmentSpecificAlarms)) {
// publishDataToDCCenterPage(equipmentSpecificIndexList);
// }
//
// //推送数据到组态大屏(消防系统)
// pushDataToIntegrationPage(equipmentSpecificIndexList);
// // 四横八纵遥测信号信息列表刷新
// publishNormalIndexValueToPage(equipmentSpecificIndexList);
// if ("zd".equals(system)) {
// System.out.println("站端系统----------------");
//
// // 向预控系统发送消息
// sendEquipSpecIndexToAutosysTopic(equipmentSpecificIndexList);
//
// // 首页性能指标数据订阅
// mqttSendGateway.sendToMqtt(indexTopic, JSON.toJSONString(indexStateList));
//
// // 组态大屏消息推送,设备表实时指标修改
// intePageSysDataRefresh(equipmentSpecificIndexList, topicEntity);
//
// // 数字换流站同步指标修改
// syncSpecificIndexsToGS(equipmentSpecificIndexList);
//
// // 则更新拓扑节点数据及告警状态
// updateNodeDateByEquipId(equipmentSpecificIndexList);
//
// // 向画布推送
// publishDataToCanvas(equipmentSpecificIndexList);
//
// // 向其他系统推送报警
// try {
// equipmentAlarmLogsToOtherSystems(alarmLogs);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
//
// if (equipmentSpecificVo.getEcode() != null) {
// String ecode = equipmentSpecificVo.getEcode();
// boolean flag = false;
//
// //消防泵
// String[] strings = pumpCodes.split(",");
// for (String string : strings) {
// if (ecode.startsWith(string)) {
// //通知>消防应急预案
// topicEntity.setType("xfb");
// mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
// flag = true;
// break;
// }
// }
//
// // 消防炮
// String[] stringxfp = monitorCodes.split(",");
// if (!flag) {
// for (String string1 : stringxfp) {
// if (ecode.startsWith(string1)) {
// //通知>消防应急预案
// topicEntity.setType("xfp");
// mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
// flag = true;
// break;
// }
// }
// }
// //消防水源
// if (!flag) {
// List<Map> lit = iEquipmentSpecificSerivce.getWater(equipmentSpecificVo.getId());
// if (lit != null && !lit.isEmpty()) {
// topicEntity.setType("xfsy");
// mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
// }
// }
// }
// }
// }
// });
// }
// });
}
......@@ -1069,7 +1069,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
indexStateList.add(createIndexStateVo(equipmentSpeIndex));
// 推送数据到组态大屏
pushDataToIntegrationPage(equipmentSpecificIndexList);
//pushDataToIntegrationPage(equipmentSpecificIndexList);
// 添加指标报告
saveEquipmentAlarmReportDay(equipmentSpeIndex);
......@@ -1189,9 +1189,9 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
String indexKey = iotDataVO.getKey();
String indexValue = iotDataVO.getValue().toString();
// 稳压泵启停信号处理
if (indexKey.equals(pressurePumpStart)) {
pressurePump(indexKey, indexValue, iotDatalist, topicEntity);
}
// if (indexKey.equals(pressurePumpStart)) {
// pressurePump(indexKey, indexValue, iotDatalist, topicEntity);
// }
});
equipmentSpecificAlarms.forEach(action -> {
......@@ -2381,338 +2381,338 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
// }
// }
private void pressurePump(String indexKey, String indexValue, List<IotDataVO> iotDatalist, TopicEntityVo topicEntity) {
List<String> listIndex = new ArrayList<>();
listIndex.add(pressurePumpStart);
// 获取全部启停泵信号
List<EquipmentSpecificIndex> equipmentSpeIndexList = equipmentSpecificIndexService.getEquipmentSpeIndexByIndex(listIndex);
List<PressurePumpEnum> enumListByCode = PressurePumpEnum.getEnumListByCode(indexValue);
if (TrueOrFalseEnum.fake.value.equals(indexValue)) {
String jobName = topicEntity.getIotCode() + "_" + indexKey;
String triggerName = PUMP_TRIGGER_NAME + "-" + topicEntity.getIotCode();
boolean b = QuartzManager.checkExists(jobName, PUMP_JOB_GROUP_NAME);
// 删除这个稳压泵的监听任务
if (b) {
QuartzManager.removeJob(jobName, PUMP_JOB_GROUP_NAME, triggerName, PUMP_TRIGGER_GROUP_NAME);
}
// 稳压泵漏水告警恢复
List<EquipmentSpecificIndex> collect = equipmentSpeIndexList.stream().filter(item -> !ObjectUtils.isEmpty(item.getIotCode()) && item.getIotCode().equals(topicEntity.getIotCode())).collect(Collectors.toList());
if (!ObjectUtils.isEmpty(collect) && !ObjectUtils.isEmpty(collect.get(0)) && !ObjectUtils.isEmpty(collect.get(0).getEquipmentId())) {
equipmentSpecificAlarmLogService.pressurePumpRestore(collect.get(0).getEquipmentId());
}
}
if (!CollectionUtils.isEmpty(enumListByCode)) {
enumListByCode.forEach(pressurePumpEnum -> {
// 1. 获取需要校验的值
PressurePumpValueEnum valueEnum = PressurePumpValueEnum.getByCode(pressurePumpEnum.getCompareValue());
assert valueEnum != null;
EquipmentSpecificIndex data = getPressurePumpDateByType(indexKey, valueEnum, topicEntity, equipmentSpeIndexList, pressurePumpEnum);
Date newDate = new Date();
// 2. 校验
if (!ObjectUtils.isEmpty(data.getUpdateDate())) {
checkValueByDate(data, newDate, pressurePumpEnum);
} else {
// 稳压泵漏水告警恢复
List<EquipmentSpecificIndex> collect = equipmentSpeIndexList.stream().filter(item -> !ObjectUtils.isEmpty(item.getIotCode()) && item.getIotCode().equals(topicEntity.getIotCode())).collect(Collectors.toList());
if (!ObjectUtils.isEmpty(collect) && !ObjectUtils.isEmpty(collect.get(0)) && !ObjectUtils.isEmpty(collect.get(0).getEquipmentId())) {
equipmentSpecificAlarmLogService.pressurePumpRestore(collect.get(0).getEquipmentId());
}
}
});
}
}
private EquipmentSpecificIndex getPressurePumpDateByType(String indexKey, PressurePumpValueEnum valueEnum, TopicEntityVo topicEntity, List<EquipmentSpecificIndex> equipmentSpeIndexList, PressurePumpEnum pressurePumpEnum) {
String iotCode = topicEntity.getIotCode();
EquipmentSpecificIndex equipmentSpecificIndex = new EquipmentSpecificIndex();
String prefix = null;
String suffix = null;
if (iotCode.length() > 8) {
prefix = iotCode.substring(0, 8);
suffix = iotCode.substring(8);
} else {
throw new BadRequest("装备物联编码错误,请确认!");
}
switch (valueEnum) {
case LAST_STOP:
List<EquipmentSpecificIndex> lastStop = equipmentSpeIndexList.stream().filter(e ->
StringUtil.isNotEmpty(e.getValue()) && e.getIotCode().equals(iotCode) && pressurePumpStart.equals(e.getEquipmentIndexKey())).sorted(Comparator.comparing(EquipmentSpecificIndex::getUpdateDate).reversed())
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(lastStop)) {
EquipmentSpecificIndex aFalse = getIotDate(equipmentSpecificIndex, lastStop, prefix, suffix, "false");
BeanUtils.copyProperties(aFalse, equipmentSpecificIndex);
}
break;
case LAST_START:
List<EquipmentSpecificIndex> lastStart = equipmentSpeIndexList.stream().filter(e ->
StringUtil.isNotEmpty(e.getValue()) && e.getIotCode().equals(iotCode) && pressurePumpStart.equals(e.getEquipmentIndexKey())).sorted(Comparator.comparing(EquipmentSpecificIndex::getUpdateDate).reversed())
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(lastStart)) {
EquipmentSpecificIndex aTrue = getIotDate(equipmentSpecificIndex, lastStart, prefix, suffix, "true");
BeanUtils.copyProperties(aTrue, equipmentSpecificIndex);
}
break;
case LATELY_STOP:
List<EquipmentSpecificIndex> latelyStop = equipmentSpeIndexList.stream().filter(e ->
StringUtil.isNotEmpty(e.getValue()) && pressurePumpStart.equals(e.getEquipmentIndexKey())).sorted(Comparator.comparing(EquipmentSpecificIndex::getUpdateDate).reversed())
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(latelyStop)) {
EquipmentSpecificIndex aFalse = getIotDate(equipmentSpecificIndex, latelyStop, prefix, null, "false");
BeanUtils.copyProperties(aFalse, equipmentSpecificIndex);
}
break;
case LATELY_START:
List<EquipmentSpecificIndex> latelyStart = equipmentSpeIndexList.stream().filter(e ->
StringUtil.isNotEmpty(e.getValue()) && e.getIotCode().equals(iotCode) && pressurePumpStart.equals(e.getEquipmentIndexKey())).sorted(Comparator.comparing(EquipmentSpecificIndex::getUpdateDate).reversed())
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(latelyStart)) {
EquipmentSpecificIndex aTrue = getIotDateExceptSelf(equipmentSpecificIndex, latelyStart, prefix, suffix, "true");
BeanUtils.copyProperties(aTrue, equipmentSpecificIndex);
}
break;
case PUMP_START_TIME:
startTimeCompute(indexKey, topicEntity, pressurePumpEnum);
break;
default:
break;
}
return equipmentSpecificIndex;
}
private EquipmentSpecificIndex getIotDate(EquipmentSpecificIndex equipmentSpecificIndex, List<EquipmentSpecificIndex> listData, String prefix, String suffix, String flag) {
ResponseModel start = iotFeign.selectOne(remoteSecurityService.getServerToken().getAppKey(), remoteSecurityService.getServerToken().getProduct(), remoteSecurityService.getServerToken().getToke(), "2", prefix, suffix, flag, pressurePumpStart);
if (200 == start.getStatus() && !ObjectUtils.isEmpty(start.getResult())) {
String json1 = JSON.toJSONString(start.getResult());
List<Map<String, String>> listObject1 = (List<Map<String, String>>) JSONArray.parse(json1);
List<Map<String, String>> collect = listObject1.stream().filter(t -> (t.containsKey("time"))).collect(Collectors.toList());
Date startDate = null;
if (1 < collect.size()) {
String startTime = collect.get(1).get("time").substring(0, 19).replace("T", " ");
startDate = DateUtils.dateAddHours(DateUtils.longStr2Date(startTime), +8);
}
listData.get(0).setUpdateDate(startDate);
BeanUtils.copyProperties(listData.get(0), equipmentSpecificIndex);
}
return equipmentSpecificIndex;
}
private EquipmentSpecificIndex getIotDateExceptSelf(EquipmentSpecificIndex equipmentSpecificIndex, List<EquipmentSpecificIndex> listData, String prefix, String suffix, String flag) {
ResponseModel start = iotFeign.selectOne(remoteSecurityService.getServerToken().getAppKey(), remoteSecurityService.getServerToken().getProduct(), remoteSecurityService.getServerToken().getToke(), "100", prefix, null, flag, pressurePumpStart + "," + "deviceName");
if (200 == start.getStatus() && !ObjectUtils.isEmpty(start.getResult())) {
String json1 = JSON.toJSONString(start.getResult());
List<Map<String, String>> listObject1 = (List<Map<String, String>>) JSONArray.parse(json1);
listObject1 = listObject1.stream().filter(x -> !suffix.equalsIgnoreCase(x.get("deviceName"))).collect(Collectors.toList());
List<Map<String, String>> collect = listObject1.stream().filter(t -> (t.containsKey("time"))).collect(Collectors.toList());
Date startDate = null;
if (0 < collect.size()) {
String startTime = collect.get(0).get("time").substring(0, 19).replace("T", " ");
startDate = DateUtils.dateAddHours(DateUtils.longStr2Date(startTime), +8);
}
listData.get(0).setUpdateDate(startDate);
BeanUtils.copyProperties(listData.get(0), equipmentSpecificIndex);
}
return equipmentSpecificIndex;
}
private void checkValueByDate(EquipmentSpecificIndex data, Date newDate, PressurePumpEnum pressurePumpEnum) {
String operator = pressurePumpEnum.getOperator();
PressurePumpCheckEnum pumpCheckEnum = PressurePumpCheckEnum.getByCode(operator);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long diff = 0;
try {
long d1 = df.parse(df.format(data.getUpdateDate())).getTime();
long d2 = df.parse(df.format(newDate)).getTime();
diff = (d2 - d1) / 1000 / 60;
} catch (Exception e) {
log.error("时间转换失败" + e.getMessage());
return;
}
assert pumpCheckEnum != null;
String leftValue = pressurePumpEnum.getLeftValue();
String rightValue = pressurePumpEnum.getRightValue();
switch (pumpCheckEnum) {
case LE:
if (StringUtil.isNotEmpty(rightValue)) {
long value = Long.parseLong(rightValue);
if (diff <= value) {
sendMessagePressure(pressurePumpEnum, data);
}
}
break;
case GE:
if (StringUtil.isNotEmpty(leftValue)) {
long value = Long.parseLong(leftValue);
if (diff >= value) {
sendMessagePressure(pressurePumpEnum, data);
}
}
break;
case BE:
if (StringUtil.isNotEmpty(leftValue) && StringUtil.isNotEmpty(rightValue)) {
long lvalue = Long.parseLong(leftValue);
long rvalue = Long.parseLong(rightValue);
if (diff >= lvalue && diff <= rvalue) {
sendMessagePressure(pressurePumpEnum, data);
}
}
break;
default:
break;
}
}
private void sendMessagePressure(PressurePumpEnum pressurePumpEnum, EquipmentSpecificIndex data) {
String level = pressurePumpEnum.getLevel();
PressurePumpMessageEnum pumpMessageEnum = PressurePumpMessageEnum.getByCode(level);
assert pumpMessageEnum != null;
String time = new SimpleDateFormat(DateUtils.DATE_TIME_PATTERN).format(new Date());
MessageModel model = new MessageModel();
String allMessage = pumpMessageEnum.getAllMessage();
String body = "";
Map<String, String> map = new HashMap<>(4);
if (StringUtil.isNotEmpty(allMessage)) {
String value = StringUtil.isNotEmpty(pressurePumpEnum.getTips()) ? pressurePumpEnum.getTips() : "";
String content = String.format(allMessage, value);
map.put("content", content);
map.put("name", data.getEquipmentSpecificName());
map.put("time", time);
}
String recordMessage = pumpMessageEnum.getRecordMessage();
if (StringUtil.isNotEmpty(recordMessage)) {
String value = StringUtil.isNotEmpty(pressurePumpEnum.getTips()) ? pressurePumpEnum.getTips() : "";
body = String.format(recordMessage, data.getEquipmentSpecificName(), data.getLocation(), value);
}
String marqueeMessage = pumpMessageEnum.getMarqueeMessage();
if (StringUtil.isNotEmpty(marqueeMessage)) {
MarqueeData marqueeData = new MarqueeData();
marqueeData.setIsRead(0);
marqueeData.setEquipmentSpecificId(data.getEquipmentId());
marqueeData.setMessageType("pressurePump");
marqueeData.setName(data.getEquipmentSpecificName());
marqueeData.setPosition(data.getLocation());
marqueeData.setType("漏水提醒");
marqueeData.setCreateDate(new Date());
marqueeDataMapper.insert(marqueeData);
// 稳压泵告警事件生成
equipmentSpecificAlarmLogService.pressurePumpDisposeAlarm(data.getEquipmentId(), String.format(org.apache.commons.lang3.StringUtils.substring(recordMessage, 13), pressurePumpEnum.getTips()));
}
switch (pumpMessageEnum) {
case MESSAGE_LEVEL_YB:
case MESSAGE_LEVEL_YZ:
map.put("type", "稳压泵启停异常提醒");
model.setTitle("稳压泵启停异常提醒");
break;
case MESSAGE_LEVEL_QT_WJ:
case MESSAGE_LEVEL_QT_WJ_YXSC:
map.put("type", "漏水提醒");
model.setTitle("漏水提醒");
break;
default:
break;
}
model.setSendTime(new Date());
model.setBody(body);
model.setExtras(map);
model.setMsgType("pressurePump");
model.setIsSendApp(false);
model.setTerminal("WEB");
model.setIsSendWeb(true);
model.setCategory(1);
List<String> receive = new ArrayList<>();
receive.add("system");
model.setRelationId(String.valueOf(data.getEquipmentId()));
model.setRecivers(receive);
Token token = remoteSecurityService.getServerToken();
systemctlFeign.create(token.getAppKey(), token.getProduct(), token.getToke(), model);
}
private void startTimeCompute(String indexKey, TopicEntityVo topicEntity, PressurePumpEnum pressurePumpEnum) {
String jobName = topicEntity.getIotCode() + "_" + indexKey;
String cron = "";
String triggerName = PUMP_TRIGGER_NAME + "-" + topicEntity.getIotCode();
// private void pressurePump(String indexKey, String indexValue, List<IotDataVO> iotDatalist, TopicEntityVo topicEntity) {
// List<String> listIndex = new ArrayList<>();
// listIndex.add(pressurePumpStart);
// // 获取全部启停泵信号
// List<EquipmentSpecificIndex> equipmentSpeIndexList = equipmentSpecificIndexService.getEquipmentSpeIndexByIndex(listIndex);
// List<PressurePumpEnum> enumListByCode = PressurePumpEnum.getEnumListByCode(indexValue);
//
// if (TrueOrFalseEnum.fake.value.equals(indexValue)) {
// String jobName = topicEntity.getIotCode() + "_" + indexKey;
// String triggerName = PUMP_TRIGGER_NAME + "-" + topicEntity.getIotCode();
// boolean b = QuartzManager.checkExists(jobName, PUMP_JOB_GROUP_NAME);
// // 删除这个稳压泵的监听任务
// if (b) {
// QuartzManager.removeJob(jobName, PUMP_JOB_GROUP_NAME, triggerName, PUMP_TRIGGER_GROUP_NAME);
// }
//
// // 稳压泵漏水告警恢复
// List<EquipmentSpecificIndex> collect = equipmentSpeIndexList.stream().filter(item -> !ObjectUtils.isEmpty(item.getIotCode()) && item.getIotCode().equals(topicEntity.getIotCode())).collect(Collectors.toList());
// if (!ObjectUtils.isEmpty(collect) && !ObjectUtils.isEmpty(collect.get(0)) && !ObjectUtils.isEmpty(collect.get(0).getEquipmentId())) {
// equipmentSpecificAlarmLogService.pressurePumpRestore(collect.get(0).getEquipmentId());
// }
// }
// if (!CollectionUtils.isEmpty(enumListByCode)) {
// enumListByCode.forEach(pressurePumpEnum -> {
// // 1. 获取需要校验的值
// PressurePumpValueEnum valueEnum = PressurePumpValueEnum.getByCode(pressurePumpEnum.getCompareValue());
// assert valueEnum != null;
// EquipmentSpecificIndex data = getPressurePumpDateByType(indexKey, valueEnum, topicEntity, equipmentSpeIndexList, pressurePumpEnum);
// Date newDate = new Date();
// // 2. 校验
// if (!ObjectUtils.isEmpty(data.getUpdateDate())) {
// checkValueByDate(data, newDate, pressurePumpEnum);
// } else {
// // 稳压泵漏水告警恢复
// List<EquipmentSpecificIndex> collect = equipmentSpeIndexList.stream().filter(item -> !ObjectUtils.isEmpty(item.getIotCode()) && item.getIotCode().equals(topicEntity.getIotCode())).collect(Collectors.toList());
// if (!ObjectUtils.isEmpty(collect) && !ObjectUtils.isEmpty(collect.get(0)) && !ObjectUtils.isEmpty(collect.get(0).getEquipmentId())) {
// equipmentSpecificAlarmLogService.pressurePumpRestore(collect.get(0).getEquipmentId());
// }
// }
// });
// }
// }
if ("FHS_PressurePump_Start_ALONE_START_YXSC".equals(pressurePumpEnum.getCode())) {
// private EquipmentSpecificIndex getPressurePumpDateByType(String indexKey, PressurePumpValueEnum valueEnum, TopicEntityVo topicEntity, List<EquipmentSpecificIndex> equipmentSpeIndexList, PressurePumpEnum pressurePumpEnum) {
// String iotCode = topicEntity.getIotCode();
// EquipmentSpecificIndex equipmentSpecificIndex = new EquipmentSpecificIndex();
// String prefix = null;
// String suffix = null;
// if (iotCode.length() > 8) {
// prefix = iotCode.substring(0, 8);
// suffix = iotCode.substring(8);
// } else {
// throw new BadRequest("装备物联编码错误,请确认!");
// }
//
// switch (valueEnum) {
// case LAST_STOP:
// List<EquipmentSpecificIndex> lastStop = equipmentSpeIndexList.stream().filter(e ->
// StringUtil.isNotEmpty(e.getValue()) && e.getIotCode().equals(iotCode) && pressurePumpStart.equals(e.getEquipmentIndexKey())).sorted(Comparator.comparing(EquipmentSpecificIndex::getUpdateDate).reversed())
// .collect(Collectors.toList());
// if (!CollectionUtils.isEmpty(lastStop)) {
// EquipmentSpecificIndex aFalse = getIotDate(equipmentSpecificIndex, lastStop, prefix, suffix, "false");
// BeanUtils.copyProperties(aFalse, equipmentSpecificIndex);
// }
// break;
// case LAST_START:
// List<EquipmentSpecificIndex> lastStart = equipmentSpeIndexList.stream().filter(e ->
// StringUtil.isNotEmpty(e.getValue()) && e.getIotCode().equals(iotCode) && pressurePumpStart.equals(e.getEquipmentIndexKey())).sorted(Comparator.comparing(EquipmentSpecificIndex::getUpdateDate).reversed())
// .collect(Collectors.toList());
// if (!CollectionUtils.isEmpty(lastStart)) {
// EquipmentSpecificIndex aTrue = getIotDate(equipmentSpecificIndex, lastStart, prefix, suffix, "true");
// BeanUtils.copyProperties(aTrue, equipmentSpecificIndex);
// }
// break;
// case LATELY_STOP:
// List<EquipmentSpecificIndex> latelyStop = equipmentSpeIndexList.stream().filter(e ->
// StringUtil.isNotEmpty(e.getValue()) && pressurePumpStart.equals(e.getEquipmentIndexKey())).sorted(Comparator.comparing(EquipmentSpecificIndex::getUpdateDate).reversed())
// .collect(Collectors.toList());
// if (!CollectionUtils.isEmpty(latelyStop)) {
// EquipmentSpecificIndex aFalse = getIotDate(equipmentSpecificIndex, latelyStop, prefix, null, "false");
// BeanUtils.copyProperties(aFalse, equipmentSpecificIndex);
//
// }
// break;
// case LATELY_START:
// List<EquipmentSpecificIndex> latelyStart = equipmentSpeIndexList.stream().filter(e ->
// StringUtil.isNotEmpty(e.getValue()) && e.getIotCode().equals(iotCode) && pressurePumpStart.equals(e.getEquipmentIndexKey())).sorted(Comparator.comparing(EquipmentSpecificIndex::getUpdateDate).reversed())
// .collect(Collectors.toList());
// if (!CollectionUtils.isEmpty(latelyStart)) {
// EquipmentSpecificIndex aTrue = getIotDateExceptSelf(equipmentSpecificIndex, latelyStart, prefix, suffix, "true");
// BeanUtils.copyProperties(aTrue, equipmentSpecificIndex);
// }
// break;
// case PUMP_START_TIME:
// startTimeCompute(indexKey, topicEntity, pressurePumpEnum);
// break;
// default:
// break;
// }
// return equipmentSpecificIndex;
// }
Calendar time = Calendar.getInstance();
time.add(Calendar.MINUTE, 5);
cron = time.get(Calendar.SECOND) + " " + time.get(Calendar.MINUTE) + "/5 * * * ?";
} else {
cron = pressurePumpEnum.getLeftValue();
}
// private EquipmentSpecificIndex getIotDate(EquipmentSpecificIndex equipmentSpecificIndex, List<EquipmentSpecificIndex> listData, String prefix, String suffix, String flag) {
// ResponseModel start = iotFeign.selectOne(remoteSecurityService.getServerToken().getAppKey(), remoteSecurityService.getServerToken().getProduct(), remoteSecurityService.getServerToken().getToke(), "2", prefix, suffix, flag, pressurePumpStart);
// if (200 == start.getStatus() && !ObjectUtils.isEmpty(start.getResult())) {
// String json1 = JSON.toJSONString(start.getResult());
// List<Map<String, String>> listObject1 = (List<Map<String, String>>) JSONArray.parse(json1);
// List<Map<String, String>> collect = listObject1.stream().filter(t -> (t.containsKey("time"))).collect(Collectors.toList());
// Date startDate = null;
// if (1 < collect.size()) {
// String startTime = collect.get(1).get("time").substring(0, 19).replace("T", " ");
// startDate = DateUtils.dateAddHours(DateUtils.longStr2Date(startTime), +8);
// }
// listData.get(0).setUpdateDate(startDate);
// BeanUtils.copyProperties(listData.get(0), equipmentSpecificIndex);
// }
// return equipmentSpecificIndex;
// }
EquipmentSpecific equipmentSpecific = null;
try {
LambdaQueryWrapper<EquipmentSpecific> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(EquipmentSpecific::getIotCode, topicEntity.getIotCode());
equipmentSpecific = equipmentSpecificMapper.selectOne(wrapper);
} catch (Exception e) {
log.error("根据iotCod查询失败" + topicEntity.getIotCode());
}
boolean b = QuartzManager.checkExists(jobName, PUMP_JOB_GROUP_NAME);
if (indexKey.equals(pressurePumpStart)) {
if (b) {
// 任务存在 更新时间
QuartzManager.modifyJobTime(triggerName, PUMP_TRIGGER_GROUP_NAME, cron);
} else {
QuartzManager.removeJob(jobName, PUMP_JOB_GROUP_NAME, triggerName, PUMP_TRIGGER_GROUP_NAME);
// 任务不存在,新增
// 传参
if (ObjectUtils.isEmpty(equipmentSpecific)) {
return;
}
Map<String, Object> parameter = new HashMap<>(6);
parameter.put("jobName", jobName);
parameter.put("triggerName", triggerName);
parameter.put("triggerGroupName", PUMP_TRIGGER_GROUP_NAME);
parameter.put("jobGroupName", PUMP_JOB_GROUP_NAME);
parameter.put("equipmentSpecific", equipmentSpecific);
parameter.put("pressurePumpEnum", pressurePumpEnum);
parameter.put("remoteSecurityService", remoteSecurityService);
parameter.put("systemctlFeign", systemctlFeign);
parameter.put("marqueeDataMapper", marqueeDataMapper);
parameter.put("equipmentSpecificAlarmLogService", equipmentSpecificAlarmLogService);
QuartzManager.addJob(jobName, PUMP_JOB_GROUP_NAME, triggerName, PUMP_TRIGGER_GROUP_NAME, PumpSendMessage.class, cron, parameter);
}
}
}
// private EquipmentSpecificIndex getIotDateExceptSelf(EquipmentSpecificIndex equipmentSpecificIndex, List<EquipmentSpecificIndex> listData, String prefix, String suffix, String flag) {
// ResponseModel start = iotFeign.selectOne(remoteSecurityService.getServerToken().getAppKey(), remoteSecurityService.getServerToken().getProduct(), remoteSecurityService.getServerToken().getToke(), "100", prefix, null, flag, pressurePumpStart + "," + "deviceName");
// if (200 == start.getStatus() && !ObjectUtils.isEmpty(start.getResult())) {
// String json1 = JSON.toJSONString(start.getResult());
// List<Map<String, String>> listObject1 = (List<Map<String, String>>) JSONArray.parse(json1);
// listObject1 = listObject1.stream().filter(x -> !suffix.equalsIgnoreCase(x.get("deviceName"))).collect(Collectors.toList());
// List<Map<String, String>> collect = listObject1.stream().filter(t -> (t.containsKey("time"))).collect(Collectors.toList());
// Date startDate = null;
// if (0 < collect.size()) {
// String startTime = collect.get(0).get("time").substring(0, 19).replace("T", " ");
// startDate = DateUtils.dateAddHours(DateUtils.longStr2Date(startTime), +8);
// }
// listData.get(0).setUpdateDate(startDate);
// BeanUtils.copyProperties(listData.get(0), equipmentSpecificIndex);
// }
// return equipmentSpecificIndex;
// }
// private void checkValueByDate(EquipmentSpecificIndex data, Date newDate, PressurePumpEnum pressurePumpEnum) {
// String operator = pressurePumpEnum.getOperator();
// PressurePumpCheckEnum pumpCheckEnum = PressurePumpCheckEnum.getByCode(operator);
// SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// long diff = 0;
// try {
// long d1 = df.parse(df.format(data.getUpdateDate())).getTime();
// long d2 = df.parse(df.format(newDate)).getTime();
// diff = (d2 - d1) / 1000 / 60;
// } catch (Exception e) {
// log.error("时间转换失败" + e.getMessage());
// return;
// }
// assert pumpCheckEnum != null;
// String leftValue = pressurePumpEnum.getLeftValue();
// String rightValue = pressurePumpEnum.getRightValue();
// switch (pumpCheckEnum) {
// case LE:
// if (StringUtil.isNotEmpty(rightValue)) {
// long value = Long.parseLong(rightValue);
// if (diff <= value) {
// sendMessagePressure(pressurePumpEnum, data);
// }
// }
// break;
// case GE:
// if (StringUtil.isNotEmpty(leftValue)) {
// long value = Long.parseLong(leftValue);
// if (diff >= value) {
// sendMessagePressure(pressurePumpEnum, data);
// }
// }
// break;
// case BE:
// if (StringUtil.isNotEmpty(leftValue) && StringUtil.isNotEmpty(rightValue)) {
// long lvalue = Long.parseLong(leftValue);
// long rvalue = Long.parseLong(rightValue);
// if (diff >= lvalue && diff <= rvalue) {
// sendMessagePressure(pressurePumpEnum, data);
// }
// }
// break;
// default:
// break;
// }
// }
@Async
public void pushDataToIntegrationPage(List<EquipmentSpecificIndex> specificIndices) {
for (EquipmentSpecificIndex specificIndex : specificIndices) {
Long equipmentSpecificId = specificIndex.getEquipmentSpecificId();
List<EquipmentSpecificIndex> equipmentSpecificIndices = equipmentSpecificIndexMapper.selectList(
Wrappers.<EquipmentSpecificIndex>lambdaQuery()
.eq(EquipmentSpecificIndex::getEquipmentSpecificId, equipmentSpecificId)
.isNotNull(EquipmentSpecificIndex::getValue)
.ne(EquipmentSpecificIndex::getValue, "")
.orderByDesc(EquipmentSpecificIndex::getEmergencyLevel)
);
List<HashMap<String, String>> valuedIndexes = equipmentSpecificIndices.stream().map(index -> new HashMap<String, String>() {{
put("key", index.getEquipmentIndexKey());
put("value", index.getValue());
}}).collect(Collectors.toList());
JSONObject message1 = new JSONObject() {{
put("code", String.valueOf(specificIndex.getId()));
put("status", specificIndex.getValue());
put("value", specificIndex.getValue());
}};
JSONObject message2 = new JSONObject() {{
put("code", String.valueOf(specificIndex.getEquipmentSpecificId()));
put("valuedIndexes", valuedIndexes);
}};
mqttSendGateway.sendToMqtt(ConfigPageTopicEnum.EQUIP_INDICATOR.getTopic(), message1.toJSONString());
mqttSendGateway.sendToMqtt(ConfigPageTopicEnum.EQUIP_MULTI_INDICATOR.getTopic(), message2.toJSONString());
}
}
// private void sendMessagePressure(PressurePumpEnum pressurePumpEnum, EquipmentSpecificIndex data) {
// String level = pressurePumpEnum.getLevel();
// PressurePumpMessageEnum pumpMessageEnum = PressurePumpMessageEnum.getByCode(level);
// assert pumpMessageEnum != null;
// String time = new SimpleDateFormat(DateUtils.DATE_TIME_PATTERN).format(new Date());
// MessageModel model = new MessageModel();
// String allMessage = pumpMessageEnum.getAllMessage();
// String body = "";
// Map<String, String> map = new HashMap<>(4);
// if (StringUtil.isNotEmpty(allMessage)) {
// String value = StringUtil.isNotEmpty(pressurePumpEnum.getTips()) ? pressurePumpEnum.getTips() : "";
// String content = String.format(allMessage, value);
// map.put("content", content);
// map.put("name", data.getEquipmentSpecificName());
// map.put("time", time);
// }
// String recordMessage = pumpMessageEnum.getRecordMessage();
// if (StringUtil.isNotEmpty(recordMessage)) {
// String value = StringUtil.isNotEmpty(pressurePumpEnum.getTips()) ? pressurePumpEnum.getTips() : "";
// body = String.format(recordMessage, data.getEquipmentSpecificName(), data.getLocation(), value);
// }
// String marqueeMessage = pumpMessageEnum.getMarqueeMessage();
// if (StringUtil.isNotEmpty(marqueeMessage)) {
// MarqueeData marqueeData = new MarqueeData();
// marqueeData.setIsRead(0);
// marqueeData.setEquipmentSpecificId(data.getEquipmentId());
// marqueeData.setMessageType("pressurePump");
// marqueeData.setName(data.getEquipmentSpecificName());
// marqueeData.setPosition(data.getLocation());
// marqueeData.setType("漏水提醒");
// marqueeData.setCreateDate(new Date());
// marqueeDataMapper.insert(marqueeData);
// // 稳压泵告警事件生成
// equipmentSpecificAlarmLogService.pressurePumpDisposeAlarm(data.getEquipmentId(), String.format(org.apache.commons.lang3.StringUtils.substring(recordMessage, 13), pressurePumpEnum.getTips()));
// }
//
// switch (pumpMessageEnum) {
// case MESSAGE_LEVEL_YB:
// case MESSAGE_LEVEL_YZ:
// map.put("type", "稳压泵启停异常提醒");
// model.setTitle("稳压泵启停异常提醒");
// break;
// case MESSAGE_LEVEL_QT_WJ:
// case MESSAGE_LEVEL_QT_WJ_YXSC:
// map.put("type", "漏水提醒");
// model.setTitle("漏水提醒");
// break;
// default:
// break;
// }
//
// model.setSendTime(new Date());
// model.setBody(body);
// model.setExtras(map);
// model.setMsgType("pressurePump");
// model.setIsSendApp(false);
// model.setTerminal("WEB");
// model.setIsSendWeb(true);
// model.setCategory(1);
// List<String> receive = new ArrayList<>();
// receive.add("system");
// model.setRelationId(String.valueOf(data.getEquipmentId()));
// model.setRecivers(receive);
// Token token = remoteSecurityService.getServerToken();
// systemctlFeign.create(token.getAppKey(), token.getProduct(), token.getToke(), model);
// }
//
// private void startTimeCompute(String indexKey, TopicEntityVo topicEntity, PressurePumpEnum pressurePumpEnum) {
// String jobName = topicEntity.getIotCode() + "_" + indexKey;
//
//
// String cron = "";
// String triggerName = PUMP_TRIGGER_NAME + "-" + topicEntity.getIotCode();
//
// if ("FHS_PressurePump_Start_ALONE_START_YXSC".equals(pressurePumpEnum.getCode())) {
//
// Calendar time = Calendar.getInstance();
// time.add(Calendar.MINUTE, 5);
// cron = time.get(Calendar.SECOND) + " " + time.get(Calendar.MINUTE) + "/5 * * * ?";
//
// } else {
// cron = pressurePumpEnum.getLeftValue();
// }
//
// EquipmentSpecific equipmentSpecific = null;
// try {
// LambdaQueryWrapper<EquipmentSpecific> wrapper = new LambdaQueryWrapper<>();
// wrapper.eq(EquipmentSpecific::getIotCode, topicEntity.getIotCode());
// equipmentSpecific = equipmentSpecificMapper.selectOne(wrapper);
// } catch (Exception e) {
// log.error("根据iotCod查询失败" + topicEntity.getIotCode());
// }
// boolean b = QuartzManager.checkExists(jobName, PUMP_JOB_GROUP_NAME);
// if (indexKey.equals(pressurePumpStart)) {
// if (b) {
// // 任务存在 更新时间
// QuartzManager.modifyJobTime(triggerName, PUMP_TRIGGER_GROUP_NAME, cron);
// } else {
// QuartzManager.removeJob(jobName, PUMP_JOB_GROUP_NAME, triggerName, PUMP_TRIGGER_GROUP_NAME);
// // 任务不存在,新增
// // 传参
// if (ObjectUtils.isEmpty(equipmentSpecific)) {
// return;
// }
// Map<String, Object> parameter = new HashMap<>(6);
// parameter.put("jobName", jobName);
// parameter.put("triggerName", triggerName);
// parameter.put("triggerGroupName", PUMP_TRIGGER_GROUP_NAME);
// parameter.put("jobGroupName", PUMP_JOB_GROUP_NAME);
// parameter.put("equipmentSpecific", equipmentSpecific);
// parameter.put("pressurePumpEnum", pressurePumpEnum);
// parameter.put("remoteSecurityService", remoteSecurityService);
// parameter.put("systemctlFeign", systemctlFeign);
// parameter.put("marqueeDataMapper", marqueeDataMapper);
// parameter.put("equipmentSpecificAlarmLogService", equipmentSpecificAlarmLogService);
// QuartzManager.addJob(jobName, PUMP_JOB_GROUP_NAME, triggerName, PUMP_TRIGGER_GROUP_NAME, PumpSendMessage.class, cron, parameter);
// }
// }
// }
//
//
// @Async
// public void pushDataToIntegrationPage(List<EquipmentSpecificIndex> specificIndices) {
// for (EquipmentSpecificIndex specificIndex : specificIndices) {
// Long equipmentSpecificId = specificIndex.getEquipmentSpecificId();
// List<EquipmentSpecificIndex> equipmentSpecificIndices = equipmentSpecificIndexMapper.selectList(
// Wrappers.<EquipmentSpecificIndex>lambdaQuery()
// .eq(EquipmentSpecificIndex::getEquipmentSpecificId, equipmentSpecificId)
// .isNotNull(EquipmentSpecificIndex::getValue)
// .ne(EquipmentSpecificIndex::getValue, "")
// .orderByDesc(EquipmentSpecificIndex::getEmergencyLevel)
// );
// List<HashMap<String, String>> valuedIndexes = equipmentSpecificIndices.stream().map(index -> new HashMap<String, String>() {{
// put("key", index.getEquipmentIndexKey());
// put("value", index.getValue());
// }}).collect(Collectors.toList());
// JSONObject message1 = new JSONObject() {{
// put("code", String.valueOf(specificIndex.getId()));
// put("status", specificIndex.getValue());
// put("value", specificIndex.getValue());
// }};
// JSONObject message2 = new JSONObject() {{
// put("code", String.valueOf(specificIndex.getEquipmentSpecificId()));
// put("valuedIndexes", valuedIndexes);
// }};
// mqttSendGateway.sendToMqtt(ConfigPageTopicEnum.EQUIP_INDICATOR.getTopic(), message1.toJSONString());
// mqttSendGateway.sendToMqtt(ConfigPageTopicEnum.EQUIP_MULTI_INDICATOR.getTopic(), message2.toJSONString());
// }
// }
}
......@@ -81,6 +81,7 @@ public class LoginController {
dPasswordAuthModel.setLoginId(userId);
dPasswordAuthModel.setPassword(DesUtil.encode(password, "qaz"));
RequestContext.setProduct(product);
System.out.println("================================getBindEquipment");
FeignClientResult feignClientResult = Privilege.authClient.idpassword(dPasswordAuthModel);
if (ObjectUtils.isEmpty(feignClientResult.getResult())){
throw new Exception("缺失登录信息");
......
......@@ -170,6 +170,7 @@ public class RemoteSecurityService {
private Toke getLogin(IdPasswordAuthModel dPasswordAuthModel){
Toke toke = new Toke();
RequestContext.setProduct(productWeb);
System.out.println("================================fas");
FeignClientResult feignClientResult = Privilege.authClient.idpassword(dPasswordAuthModel);
Map map = (Map) feignClientResult.getResult();
if(map!=null){
......
......@@ -113,6 +113,7 @@ public class RemoteSecurityService {
private Toke getLogin(IdPasswordAuthModel dPasswordAuthModel) {
Toke toke = new Toke();
RequestContext.setProduct(productWeb);
System.out.println("================================latent");
FeignClientResult feignClientResult = Privilege.authClient.idpassword(dPasswordAuthModel);
Map map = (Map) feignClientResult.getResult();
if (map != null) {
......@@ -532,6 +533,7 @@ public class RemoteSecurityService {
RequestContext.setProduct(productApp);
Toke oked = new Toke();
try {
System.out.println("================================latentdanger2");
feignClientResult = Privilege.authClient.idpassword(dPasswordAuthModel);
map = (Map) feignClientResult.getResult();
map.put("appKey", appKey);
......
......@@ -114,6 +114,7 @@ public class RemoteSecurityService {
private Toke getLogin(IdPasswordAuthModel dPasswordAuthModel) {
Toke toke = new Toke();
RequestContext.setProduct(productWeb);
System.out.println("================================maintenance");
FeignClientResult feignClientResult = Privilege.authClient.idpassword(dPasswordAuthModel);
Map map = (Map) feignClientResult.getResult();
if (map != null) {
......@@ -541,6 +542,7 @@ public class RemoteSecurityService {
RequestContext.setProduct(productApp);
Toke oked = new Toke();
try {
System.out.println("================================maintenance2");
feignClientResult = Privilege.authClient.idpassword(dPasswordAuthModel);
map = (Map) feignClientResult.getResult();
map.put("appKey", appKey);
......
......@@ -116,6 +116,7 @@ public class RemoteSecurityService {
private Toke getLogin(IdPasswordAuthModel dPasswordAuthModel) {
Toke toke = new Toke();
RequestContext.setProduct(productWeb);
System.out.println("================================patrol");
FeignClientResult feignClientResult = Privilege.authClient.idpassword(dPasswordAuthModel);
Map map = (Map) feignClientResult.getResult();
if (map != null) {
......@@ -535,6 +536,7 @@ public class RemoteSecurityService {
RequestContext.setProduct(productApp);
Toke oked = new Toke();
try {
System.out.println("================================patrol2");
feignClientResult = Privilege.authClient.idpassword(dPasswordAuthModel);
map = (Map) feignClientResult.getResult();
map.put("appKey", appKey);
......
......@@ -117,6 +117,7 @@ public class RemoteSecurityService {
private Toke getLogin(IdPasswordAuthModel dPasswordAuthModel) {
Toke toke = new Toke();
RequestContext.setProduct(productWeb);
System.out.println("================================supervision");
FeignClientResult feignClientResult = Privilege.authClient.idpassword(dPasswordAuthModel);
Map map = (Map) feignClientResult.getResult();
if (map != null) {
......@@ -544,6 +545,7 @@ public class RemoteSecurityService {
RequestContext.setProduct(productApp);
Toke oked = new Toke();
try {
System.out.println("================================supervision2");
feignClientResult = Privilege.authClient.idpassword(dPasswordAuthModel);
map = (Map) feignClientResult.getResult();
map.put("appKey", appKey);
......
......@@ -85,9 +85,9 @@ public class JobService implements IJobService {
public void initScheduler() {
// TODO Auto-generated method stub
log.debug("======================initScheduler===========================");
initTaskJob();
initPlanTaskJob();
initMsgJob();
//initTaskJob();
//initPlanTaskJob();
//initMsgJob();
}
@Override
......
......@@ -12,9 +12,9 @@ public class PlanTaskJobService implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
setJobService((IJobService) jobDataMap.get("jobService"));
jobService.planTaskJobPerform(Long.valueOf(jobDataMap.get("id").toString()),jobDataMap.get("jobType").toString(),jobDataMap.get("jobName").toString());
//JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
//setJobService((IJobService) jobDataMap.get("jobService"));
//jobService.planTaskJobPerform(Long.valueOf(jobDataMap.get("id").toString()),jobDataMap.get("jobType").toString(),jobDataMap.get("jobName").toString());
}
public IJobService getJobService() {
......
......@@ -330,6 +330,7 @@ public class WechatController extends BaseController {
IdPasswordAuthModel loninData = new IdPasswordAuthModel();
loninData.setLoginId(model.getPhone());
loninData.setPassword(passwd);
System.out.println("================================wechat");
FeignClientResult loginResult = Privilege.authClient.idpassword(loninData);
if(loginResult.getStatus() == 200) {
HashMap resultMap = (HashMap) loginResult.getResult();
......
......@@ -151,6 +151,7 @@ public class TzsAuthServiceImpl implements TzsAuthService {
IdPasswordAuthModel loninData = new IdPasswordAuthModel();
loninData.setLoginId(ctiUserName);
loninData.setPassword(passwd);
System.out.println("================================tzs.loginCtiUser");
FeignClientResult loginResult = Privilege.authClient.idpassword(loninData);
if(loginResult.getStatus() == 200) {
HashMap resultMap = (HashMap) loginResult.getResult();
......
......@@ -22,6 +22,6 @@ public class CylinderSchedulerJob {
*/
@Scheduled(cron = "${cylinder-early-warning-cron:0 0 9 * * ?}")
public void dayReport() {
scheduleService.calEarlyWarningLevel();
// scheduleService.calEarlyWarningLevel();
}
}
......@@ -333,6 +333,10 @@
<modules>
<module>amos-boot-module</module>
<module>amos-boot-biz-common</module>
<module>amos-boot-data</module>
<module>amos-boot-core</module>
<module>amos-boot-utils</module>
<!--
<module>amos-boot-system-tzs</module>
<module>amos-boot-system-jcs</module>
<module>amos-boot-system-knowledgebase</module>
......@@ -340,16 +344,16 @@
<module>amos-boot-system-patrol</module>
<module>amos-boot-system-maintenance</module>
<module>amos-boot-system-supervision</module>
-->
<module>amos-boot-system-equip</module>
<module>amos-boot-core</module>
<module>amos-boot-utils</module>
<!--
<module>amos-boot-system-latentdanger</module>
<module>amos-boot-system-ccs</module>
<module>amos-boot-data</module>
<module>amos-boot-system-precontrol</module>
<module>amos-boot-system-cas</module>
<module>amos-boot-system-tdc</module>
<module>amos-boot-system-kgd</module>
-->
</modules>
<build>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment