Commit eb386f06 authored by 刘林's avatar 刘林

Merge branch 'develop_dl_3.7.0.9' into develop_dl

# Conflicts: # amos-boot-module/amos-boot-module-biz/amos-boot-module-equip-biz/src/main/java/com/yeejoin/equipmanage/service/impl/MqttReceiveServiceImpl.java # amos-boot-system-equip/src/main/resources/application-dev.properties
parents 5282006a 501bbca5
......@@ -56,6 +56,12 @@
<artifactId>amos-component-security</artifactId>
<version>1.7.13-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.yeejoin</groupId>
<artifactId>amos-component-influxdb</artifactId>
<version>1.8.5-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
......@@ -1714,16 +1714,18 @@ public class EquipmentSpecificSerivceImpl extends ServiceImpl<EquipmentSpecificM
@Override
public void updateEquipmentSpecIndexRealtimeData(EquipmentSpecificIndex index) {
// TODO Auto-generated method stub
if (!ObjectUtils.isEmpty(index)) {
if (!ObjectUtils.isEmpty(index.getEquipmentSpecificId())) {
EquipmentSpecific es = equipmentSpecificMapper.selectById(index.getEquipmentSpecificId());
es.setRealtimeIotEsIndexId(index.getId());
es.setRealtimeIotIndexKey(index.getNameKey());
es.setRealtimeIotIndexName(index.getEquipmentSpecificIndexName());
es.setRealtimeIotIndexValue(index.getValue());
es.setRealtimeIotIndexId(index.getEquipmentIndexId());
es.setRealtimeIotIndexUpdateDate(index.getUpdateDate());
es.setValueLabel(index.getValueLabel());
equipmentSpecificMapper.updateById(es);
if(!ObjectUtils.isEmpty(es)){
es.setRealtimeIotEsIndexId(index.getId());
es.setRealtimeIotIndexKey(index.getNameKey());
es.setRealtimeIotIndexName(index.getEquipmentSpecificIndexName());
es.setRealtimeIotIndexValue(index.getValue());
es.setRealtimeIotIndexId(index.getEquipmentIndexId());
es.setRealtimeIotIndexUpdateDate(index.getUpdateDate());
es.setValueLabel(index.getValueLabel());
equipmentSpecificMapper.updateById(es);
}
}
}
......
......@@ -7,6 +7,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.amos.feign.privilege.model.DepartmentModel;
import com.yeejoin.amos.component.influxdb.InfluxDbConnection;
import com.yeejoin.amos.feign.systemctl.model.MessageModel;
import com.yeejoin.equipmanage.common.datasync.entity.FireEquipmentDefectAlarm;
import com.yeejoin.equipmanage.common.datasync.entity.FireEquipmentFaultAlarm;
......@@ -118,6 +119,10 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
//消防炮
@Value("${equipment.plan.monitor}")
String monitorCodes;
@Autowired
private InfluxDbConnection influxDbConnection;
/**
* 泡沫罐KEY
*/
......@@ -628,28 +633,27 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
@Override
@Transactional(rollbackFor = Exception.class)
public void handlerMqttIotMessage(String topic, String message) {
//influxdb
Map<String, String> tagsMap = new HashMap<>();
Map<String, Object> fieldsMap = new HashMap<>();
Map<String, Object> fieldsStrMap = new HashMap<>();
log.info("接收到iot消息: {}", message);
TopicEntityVo topicEntity = new TopicEntityVo();
topicEntity.setTopic(topic);
topicEntity.setMessage(message);
List<IotDataVO> iotDatalist = new ArrayList<>();
List<EquipmentSpecificIndex> equipmentSpecificIndexList = new ArrayList<>();
List<EquipmentSpecificAlarm> equipmentSpecificAlarms = new ArrayList<>();
List<IndexStateVo> indexStateList = new ArrayList<>();
JSONObject jsonObject = JSONObject.parseObject(message);
String dataType = jsonObject.getString("dataType");
String indexAddress = jsonObject.getString("address");
String traceId = jsonObject.getString("traceId");
String deviceCode = jsonObject.getString("deviceCode");
String gatewayId = jsonObject.getString("gatewayId");
String value;
//如果消息是遥信类型,进行指标转换
if(dataType != null && dataType.equals("state")){
value = jsonObject.getString("value");
}else{
value = jsonObject.getInteger("value") ==1 ? "true":"false";
}
String value=jsonObject.getString("value");
EquipmentSpecificIndex equipmentSpeIndex = equipmentSpecificIndexService.getEquipmentSpeIndexByIndexAddress(indexAddress,gatewayId);
if (equipmentSpeIndex == null){
......@@ -664,10 +668,25 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
equipmentSpeIndex.setDataType(dataType);
equipmentSpeIndex.setTraceId(traceId);
equipmentSpeIndex.setUUID(UUIDUtils.getUUID());
IotDataVO iotDataVO = new IotDataVO();
iotDataVO.setKey(equipmentSpeIndex.getNameKey());
iotDataVO.setValue(value);
iotDatalist.add(iotDataVO);
//更新装备性能指标
//equipmentSpecificIndexService.updateById(equipmentSpeIndex);
tagsMap.put("key", indexAddress+"_"+gatewayId);
fieldsMap.put("traceId", traceId);
fieldsMap.put("address", indexAddress);
fieldsMap.put("value", value);
fieldsMap.put("gatewayId", gatewayId);
fieldsMap.put("dataType", dataType);
fieldsMap.put("equipmentId", equipmentSpeIndex.getEquipmentId());
fieldsMap.put("equipmentIndexName", equipmentSpeIndex.getEquipmentIndexName());
fieldsMap.put("equipmentIndexKey", equipmentSpeIndex.getEquipmentIndexKey());
fieldsMap.put("isAlarm", equipmentSpeIndex.getIsAlarm().toString());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
fieldsMap.put("createdTime", simpleDateFormat.format(new Date()));
fieldsMap.put("equipmentIndex", JSON.toJSONString(equipmentSpeIndex));
influxDbConnection.insert("iot_data", tagsMap, fieldsMap);
QueryWrapper<EquipmentSpecific> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("id", equipmentSpeIndex.getEquipmentSpecificId());
......@@ -690,11 +709,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
topicEntity.setType(equipmentSpecificVo.getType());
topicEntity.setCode(equipmentSpecificVo.getCode());
//es存储数据
eSeqService.saveESEquiplistSpecificBySystemESVO(equipmentSpeIndex, String.valueOf(equipmentSpecificVo.getSystemId()), equipmentSpecificVo.getSystemName());
//更新装备性能指标
equipmentSpecificIndexService.updateById(equipmentSpeIndex);
// 更新设备表指标状态
iEquipmentSpecificSerivce.updateEquipmentSpecIndexRealtimeData(equipmentSpeIndex);
......@@ -705,109 +719,11 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
// 添加指标报告
saveEquipmentAlarmReportDay(equipmentSpeIndex);
// 火眼数据构造告警指标逻辑
equipmentSpeIndex = handleTemperatureAlarm(equipmentSpeIndex, iotDatalist);
boolean alarmFlag = false;
Map<String, String> messageBodyMap = new HashMap<>();
//管网压力、泡沫罐信息、水箱液位告警处理
if (iotDataVO.getKey().toLowerCase().equals(CAFS_FoamTank_FoamTankLevel.toLowerCase()) ||
FHS_PipePressureDetector_PipePressure.toLowerCase().equals(iotDataVO.getKey().toLowerCase()) ||
iotDataVO.getKey().toLowerCase().equals(CAFS_WaterTank_WaterTankLevel.toLowerCase())) {
alarmFlag = doFoamTankLevel(iotDataVO, equipmentSpeIndex, messageBodyMap);
}
//消防水池液位处理
if (iotDataVO.getKey().toLowerCase().equals(FHS_FirePoolDevice_WaterLevel.toLowerCase()) ||
iotDataVO.getKey().toLowerCase().equals(FHS_WirelessliquidDetector_WaterLevel.toLowerCase())) {
alarmFlag = doWaterPoolLevel(iotDataVO, equipmentSpeIndex, messageBodyMap);
}
// 遥测数据生成告警事件、日志处理
if (iotDataVO.getKey().toLowerCase().equals(CAFS_FoamTank_FoamTankLevel.toLowerCase()) ||
FHS_PipePressureDetector_PipePressure.toLowerCase().equals(iotDataVO.getKey().toLowerCase()) ||
iotDataVO.getKey().toLowerCase().equals(CAFS_WaterTank_WaterTankLevel.toLowerCase()) ||
iotDataVO.getKey().toLowerCase().equals(FHS_FirePoolDevice_WaterLevel.toLowerCase()) ||
iotDataVO.getKey().toLowerCase().equals(FHS_WirelessliquidDetector_WaterLevel.toLowerCase())) {
handlingAlarms(equipmentSpeIndex, alarmFlag);
}
// 指标告警处理
if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) {
equipmentSpecificAlarms.addAll(createIndexAlarmRecord(equipmentSpeIndex, messageBodyMap));
}
// 遥测遥信数据推送云端kafka
JSONObject jsonObjectXf = new JSONObject();
jsonObjectXf.put("data_class", "realdata");
if (equipmentSpeIndex.getIsTrend() == 1) {
jsonObjectXf.put("data_type", "analog");
} else {
jsonObjectXf.put("data_type", "state");
}
String date = DateUtils.date2LongStr(new Date());
jsonObjectXf.put("op_type", "subscribe_emergency");
JSONObject jsonObjectCondition = new JSONObject();
jsonObjectCondition.put("station_psr_id", stationCode);
jsonObjectCondition.put("station_name", stationName);
jsonObjectCondition.put("data_upload_time", date);
jsonObjectXf.put("condition", jsonObjectCondition);
JSONObject jsonObjectData = new JSONObject();
jsonObjectData.put("psrId", stationCode);
jsonObjectData.put("astId", equipmentSpeIndex.getSpecificCode());
jsonObjectData.put("equipType", equipmentSpeIndex.getEquipmentCode());
jsonObjectData.put("name", equipmentSpeIndex.getEquipmentSpecificName() + "-" + equipmentSpeIndex.getEquipmentSpecificIndexName());
if (value.equals("true")) {
jsonObjectData.put("value", "1");
} else if (value.equals("false")) {
jsonObjectData.put("value", "0");
} else {
jsonObjectData.put("value", value);
}
jsonObjectData.put("measurementType", null == equipmentSpeIndex.getEquipmentIndexKey() ? "" : equipmentSpeIndex.getEquipmentIndexKey());
jsonObjectData.put("dateTime", date);
jsonObjectData.put("quality", "0"); // 量测质量码:0 有效,1 无效
List<JSONObject> jsonObjects = Arrays.asList(jsonObjectData);
jsonObjectXf.put("data", jsonObjects);
// 遥测
if (!isOpenTelemetering && equipmentSpeIndex.getIsTrend() == 1) {
} else {
try {
emqKeeper.getMqttClient().publish("emq.xf.created", jsonObjectXf.toString().getBytes(), 1, false);
log.info("遥测遥信数据推送云端kafka成功");
} catch (MqttException e) {
log.error("遥测遥信数据推送云端kafka失败=====>" + e.getMessage());
e.printStackTrace();
}
}
// 报警数据保存
List<EquipmentSpecificAlarmLog> alarmLogs = new ArrayList<>();
if (!ObjectUtils.isEmpty(equipmentSpecificAlarms)) {
equipmentSpecificAlarmService.saveOrUpdateBatch(equipmentSpecificAlarms);
}
// 需要在事务提交之后,否则事务隔离查询不出数据
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
equipmentSpecificAlarms.forEach(action -> {
if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) {
alarmLogs.add(addEquipAlarmLogRecord(action));
if (ValidationUtil.isEmpty(action.getAlamContent())) {
action.setAlamContent(action.getEquipmentSpecificName() + action.getEquipmentSpecificIndexName());
}
mqttSendGateway.sendToMqtt(TopicEnum.EQDQR.getTopic(), JSONArray.toJSON(action).toString());
} else {
alarmLogs.addAll(upAlarmLogStatus(action.getIotCode(), action.getEquipmentSpecificIndexKey(), action.getTraceId(),
equipmentSpecificAlarmLogService, false));
mqttSendGateway.sendToMqtt(TopicEnum.EQYQR.getTopic(), JSONArray.toJSON(action).toString());
bool = Boolean.TRUE;
}
});
// 直流中心消息推送刷新
publishDataToDCCenterPage(equipmentSpecificIndexList);
......@@ -835,47 +751,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
// 向画布推送
publishDataToCanvas(equipmentSpecificIndexList);
// 向其他系统推送报警
equipmentAlarmLogsToOtherSystems(alarmLogs);
if (equipmentSpecificVo.getEcode() != null) {
String ecode = equipmentSpecificVo.getEcode();
boolean flag = false;
//消防泵
String[] strings = pumpCodes.split(",");
for (String string : strings) {
if (ecode.startsWith(string)) {
//通知>消防应急预案
topicEntity.setType("xfb");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
flag = true;
break;
}
}
// 消防炮
String[] stringxfp = monitorCodes.split(",");
if (!flag) {
for (String string1 : stringxfp) {
if (ecode.startsWith(string1)) {
//通知>消防应急预案
topicEntity.setType("xfp");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
flag = true;
break;
}
}
}
//消防水源
if (!flag) {
List<Map> lit = iEquipmentSpecificSerivce.getWater(equipmentSpecificVo.getId());
if (lit != null && lit.size() > 0) {
topicEntity.setType("xfsy");
mqttSendGateway.sendToMqtt(emergencyDisposalIndicators, JSONObject.toJSONString(topicEntity));
}
}
}
}
}
});
......@@ -1139,7 +1014,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
if(vo.getEcode()!=null){
String ecode= vo.getEcode();
boolean flag=false;
//消防泵
String[] strings = pumpCodes.split(",");
for (String string : strings) {
......@@ -1151,7 +1026,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
break;
}
}
// 消防炮
String[] stringxfp = monitorCodes.split(",");
if(!flag){
......
......@@ -135,4 +135,14 @@ water.level.indexKey=FHS_FirePoolDevice_WaterLevel,FHS_LevelDetector_WaterLevel,
mileage.parameter=0.5
# 江西电建-车辆里程跨天记录切分(每日0点执行)
mileage.segmentation.cron=0 0 0 * * ?
mileage.clippingtime=600000
\ No newline at end of file
mileage.clippingtime=600000
# influxDB
spring.influx.url=http://172.16.11.201:8086
spring.influx.password=Yeejoin@2020
spring.influx.user=root
spring.influx.database=iot_platform
spring.influx.retention_policy=default
spring.influx.retention_policy_time=30d
spring.influx.actions=10000
spring.influx.bufferLimit=20000
\ No newline at end of file
......@@ -9,7 +9,7 @@ spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
#mybatis mapper file
mybatis.mapper-locations=classpath:mapper/*.xml
#mybatis-plus
mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
mybatis-plus.configuration.log-impl=org.apache.ibatis.logging.slf4j.Slf4jImpl
# mybatis entity package
mybatis.type-aliases-package=com.yeejoin.equipmanage.common.entity
spring.jackson.time-zone=GMT+8
......
......@@ -513,6 +513,8 @@
#{item}
</foreach>
</if>
and es.iot_code is not null
and es.iot_code != ''
</where>
</select>
......
......@@ -3870,5 +3870,16 @@
</sql>
</changeSet>
</databaseChangeLog>
<changeSet author="ltw" id="20230614-ltw-01">
<preConditions onFail="MARK_RAN">
<not>
<columnExists tableName="cb_organization_user" columnName="post_name"/>
</not>
</preConditions>
<comment>modify table cb_organization_user modify columns</comment>
<sql>
ALTER TABLE `cb_organization_user` MODIFY `post_name` varchar(4000) DEFAULT NULL COMMENT '岗位名称'
</sql>
</changeSet>
</databaseChangeLog>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment