Commit 4627f6b2 authored by KeYong's avatar KeYong

Merge branch 'develop_dl' into develop_dl_4.0

parents 7ee0d0f4 eca098cc
...@@ -10,7 +10,7 @@ public enum PressurePumpEnum { ...@@ -10,7 +10,7 @@ public enum PressurePumpEnum {
ALONE_START_YXSC("FHS_PressurePump_Start_ALONE_START_YXSC", "", LocalDateTime.now().getSecond() + " " + (LocalDateTime.now().getMinute()) + "/5 * * * ?", "5", ALONE_START_YXSC("FHS_PressurePump_Start_ALONE_START_YXSC", "", LocalDateTime.now().getSecond() + " " + (LocalDateTime.now().getMinute()) + "/5 * * * ?", "5",
PressurePumpValueEnum.PUMP_START_TIME.getCode(), PressurePumpMessageEnum.MESSAGE_LEVEL_QT_WJ_YXSC.getCode(), "【5】分钟"), PressurePumpValueEnum.PUMP_START_TIME.getCode(), PressurePumpMessageEnum.MESSAGE_LEVEL_QT_WJ_YXSC.getCode(), "【5】分钟"),
ALONE_START_QT("FHS_PressurePump_Start_ALONE_START_QT", PressurePumpCheckEnum.LE.getCode(), "", "5", ALONE_START_QT("FHS_PressurePump_Start_ALONE_START_QT", PressurePumpCheckEnum.BE.getCode(), "0.5", "5",
PressurePumpValueEnum.LAST_START.getCode(), PressurePumpMessageEnum.MESSAGE_LEVEL_QT_WJ.getCode(), "【5】分钟"), PressurePumpValueEnum.LAST_START.getCode(), PressurePumpMessageEnum.MESSAGE_LEVEL_QT_WJ.getCode(), "【5】分钟"),
ALONE_START_QT_WJ("FHS_PressurePump_Start_ALONE_START_QT_WJ", PressurePumpCheckEnum.BE.getCode(), "5", "30", ALONE_START_QT_WJ("FHS_PressurePump_Start_ALONE_START_QT_WJ", PressurePumpCheckEnum.BE.getCode(), "5", "30",
...@@ -19,7 +19,7 @@ public enum PressurePumpEnum { ...@@ -19,7 +19,7 @@ public enum PressurePumpEnum {
ALONE_START_QT_YZ("FHS_PressurePump_Start_ALONE_START_QT_YZ", PressurePumpCheckEnum.BE.getCode(), "30", "60", ALONE_START_QT_YZ("FHS_PressurePump_Start_ALONE_START_QT_YZ", PressurePumpCheckEnum.BE.getCode(), "30", "60",
PressurePumpValueEnum.LAST_START.getCode(), PressurePumpMessageEnum.MESSAGE_LEVEL_YZ.getCode(), "【1】小时"), PressurePumpValueEnum.LAST_START.getCode(), PressurePumpMessageEnum.MESSAGE_LEVEL_YZ.getCode(), "【1】小时"),
ALL_START_QT_WJ("FHS_PressurePump_Start_ALL_START_QT_WJ", PressurePumpCheckEnum.LE.getCode(), "", "5", ALL_START_QT_WJ("FHS_PressurePump_Start_ALL_START_QT_WJ", PressurePumpCheckEnum.BE.getCode(), "0.5", "5",
PressurePumpValueEnum.LATELY_START.getCode(), PressurePumpMessageEnum.MESSAGE_LEVEL_QT_WJ.getCode(), "【5】分钟"), PressurePumpValueEnum.LATELY_START.getCode(), PressurePumpMessageEnum.MESSAGE_LEVEL_QT_WJ.getCode(), "【5】分钟"),
START_QT_WJ_ALL("FHS_PressurePump_Start_ALL_START_QT_YZ", PressurePumpCheckEnum.BE.getCode(), "5", "30", START_QT_WJ_ALL("FHS_PressurePump_Start_ALL_START_QT_YZ", PressurePumpCheckEnum.BE.getCode(), "5", "30",
......
...@@ -155,6 +155,21 @@ public class EmergencyController extends AbstractBaseController { ...@@ -155,6 +155,21 @@ public class EmergencyController extends AbstractBaseController {
} }
@TycloudOperation(ApiLevel = UserType.AGENCY) @TycloudOperation(ApiLevel = UserType.AGENCY)
@ApiOperation("排油系统列表")
@GetMapping(value = "/selectOilDrainageNew")
public Page<Map<String, Object>> selectOilDrainageNew(@RequestParam(value = "bizOrgCode", required = false) String bizOrgCode ,
@RequestParam(value = "pageNumber") int pageNumber,
@RequestParam(value = "pageSize") int pageSize,
@RequestParam(value = "code") String code) {
Page<Map<String, Object>> page = new Page<>(pageNumber, pageSize);
if (ObjectUtils.isEmpty(bizOrgCode)){
ReginParams reginParams = getSelectedOrgInfo();
bizOrgCode = reginParams.getPersonIdentity().getBizOrgCode();
}
return iEmergencyService.selectOilDrainageNew(page, bizOrgCode, code);
}
@TycloudOperation(ApiLevel = UserType.AGENCY)
@ApiOperation("气体灭火系统") @ApiOperation("气体灭火系统")
@GetMapping(value = "/selectGasExtinguishing") @GetMapping(value = "/selectGasExtinguishing")
public Page<Map<String, Object>> selectGasExtinguishing(@RequestParam(value = "bizOrgCode", required = false) String bizOrgCode , public Page<Map<String, Object>> selectGasExtinguishing(@RequestParam(value = "bizOrgCode", required = false) String bizOrgCode ,
......
...@@ -212,11 +212,11 @@ public class SupervisionConfigureController extends AbstractBaseController { ...@@ -212,11 +212,11 @@ public class SupervisionConfigureController extends AbstractBaseController {
levelAbsLiter = volume.multiply(new BigDecimal(1000)); levelAbsLiter = volume.multiply(new BigDecimal(1000));
}else { }else {
BigDecimal abs = new BigDecimal(String.valueOf(transResult.get("abs"))); BigDecimal abs = new BigDecimal(String.valueOf(transResult.get("abs")));
levelAbsLiter = volume.multiply(abs.divide(new BigDecimal(100),2, RoundingMode.HALF_UP)); levelAbsLiter = volume.multiply(new BigDecimal(1000)).multiply(abs.divide(new BigDecimal(100),2, RoundingMode.HALF_UP));
} }
float outputFlowRate = Float.parseFloat(String.valueOf(m.get("outputFlowRate"))); float outputFlowRate = Float.parseFloat(String.valueOf(m.get("outputFlowRate")));
if (levelAbsLiter.compareTo(new BigDecimal(0)) != 0 && outputFlowRate != 0) { if (levelAbsLiter.compareTo(new BigDecimal(0)) != 0 && outputFlowRate != 0) {
double availableSeconds = (levelAbsLiter.divide(new BigDecimal(outputFlowRate), 0, RoundingMode.HALF_UP).doubleValue()); double availableSeconds = (levelAbsLiter.divide(BigDecimal.valueOf(outputFlowRate), 0, RoundingMode.HALF_UP).doubleValue());
m.put("availableTime", String.format("%.1fh", availableSeconds / 3600.0)); m.put("availableTime", String.format("%.1fh", availableSeconds / 3600.0));
} else { } else {
m.put("availableTime", "--"); m.put("availableTime", "--");
......
...@@ -53,6 +53,14 @@ public interface EmergencyMapper extends BaseMapper{ ...@@ -53,6 +53,14 @@ public interface EmergencyMapper extends BaseMapper{
*/ */
Page<Map<String, Object>> selectOilDrainage(Page<Map<String, Object>> page , @Param("bizOrgCode") String bizOrgCode, @Param("code") String code); Page<Map<String, Object>> selectOilDrainage(Page<Map<String, Object>> page , @Param("bizOrgCode") String bizOrgCode, @Param("code") String code);
/**
* 排油系统
* @param bizOrgCode
* @return
*/
Page<Map<String, Object>> selectOilDrainageNew(Page<Map<String, Object>> page , @Param("bizOrgCode") String bizOrgCode, @Param("code") String code);
/** /**
* 气体灭火系统 * 气体灭火系统
* @param bizOrgCode * @param bizOrgCode
......
...@@ -39,6 +39,8 @@ public interface IEmergencyService { ...@@ -39,6 +39,8 @@ public interface IEmergencyService {
Page<Map<String, Object>> selectOilDrainage(Page<Map<String, Object>> page, String bizOrgCode, String code); Page<Map<String, Object>> selectOilDrainage(Page<Map<String, Object>> page, String bizOrgCode, String code);
Page<Map<String, Object>> selectOilDrainageNew(Page<Map<String, Object>> page, String bizOrgCode, String code);
Page<Map<String, Object>> selectGasExtinguishing(Page<Map<String, Object>> page, String bizOrgCode, String code); Page<Map<String, Object>> selectGasExtinguishing(Page<Map<String, Object>> page, String bizOrgCode, String code);
Page<Map<String, Object>> selectPressureFlow(Page<Map<String, Object>> page, String bizOrgCode); Page<Map<String, Object>> selectPressureFlow(Page<Map<String, Object>> page, String bizOrgCode);
......
...@@ -181,6 +181,11 @@ public class EmergencyServiceImpl implements IEmergencyService { ...@@ -181,6 +181,11 @@ public class EmergencyServiceImpl implements IEmergencyService {
} }
@Override @Override
public Page<Map<String, Object>> selectOilDrainageNew(Page<Map<String, Object>> page, String bizOrgCode, String code) {
return emergencyMapper.selectOilDrainageNew(page, bizOrgCode, code);
}
@Override
public Page<Map<String, Object>> selectGasExtinguishing(Page<Map<String, Object>> page, String bizOrgCode, String code) { public Page<Map<String, Object>> selectGasExtinguishing(Page<Map<String, Object>> page, String bizOrgCode, String code) {
return emergencyMapper.selectGasExtinguishing(page, bizOrgCode, code); return emergencyMapper.selectGasExtinguishing(page, bizOrgCode, code);
} }
......
...@@ -1231,13 +1231,13 @@ public class FireFightingSystemServiceImpl extends ServiceImpl<FireFightingSyste ...@@ -1231,13 +1231,13 @@ public class FireFightingSystemServiceImpl extends ServiceImpl<FireFightingSyste
.eq(EquipmentSpecific::getSingle, true) .eq(EquipmentSpecific::getSingle, true)
); );
// 未复归设备 // 未复归设备
List<EquipmentSpecificAlarmLog> equipSpecIds = equipmentSpecificAlarmLogMapper.selectList( List<EquipmentSpecificAlarm> equipSpecIds = equipmentSpecificAlarmMapper.selectList(
Wrappers.<EquipmentSpecificAlarmLog>lambdaQuery() Wrappers.<EquipmentSpecificAlarm>lambdaQuery()
.select(EquipmentSpecificAlarmLog::getEquipmentSpecificId) .select(EquipmentSpecificAlarm::getEquipmentSpecificId)
.like(EquipmentSpecificAlarmLog::getSystemIds, id) .like(EquipmentSpecificAlarm::getSystemIds, id)
.eq(EquipmentSpecificAlarmLog::getStatus, "1") .eq(EquipmentSpecificAlarm::getStatus, "1")
); );
int count = (int) equipSpecIds.stream().map(EquipmentSpecificAlarmLog::getEquipmentSpecificId).distinct().count(); int count = (int) equipSpecIds.stream().map(EquipmentSpecificAlarm::getEquipmentSpecificId).distinct().count();
list.add(new AlarmDataVO("部件总数", equipmentCount + " 个", false)); list.add(new AlarmDataVO("部件总数", equipmentCount + " 个", false));
list.add(new AlarmDataVO("未复归设备", count + " 个", false)); list.add(new AlarmDataVO("未复归设备", count + " 个", false));
return list; return list;
......
...@@ -500,6 +500,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -500,6 +500,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
StringBuilder endIndex = new StringBuilder(iotCode).insert(8, '/'); StringBuilder endIndex = new StringBuilder(iotCode).insert(8, '/');
String iotTopic = "influxdb/" + endIndex; String iotTopic = "influxdb/" + endIndex;
JSONObject msg = new JSONObject(); JSONObject msg = new JSONObject();
msg.put("traceId", equipmentSpeIndex.getId() + "");
msg.put(equipmentSpeIndex.getEquipmentIndexKey(), value); msg.put(equipmentSpeIndex.getEquipmentIndexKey(), value);
mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg)); mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg));
...@@ -560,7 +561,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -560,7 +561,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
saveEquipmentAlarmReportDay(equipmentSpeIndex, alarmFlag); saveEquipmentAlarmReportDay(equipmentSpeIndex, alarmFlag);
// 指标告警处理 // 指标告警处理
if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) { if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm() && !equipmentSpeIndex.getEquipmentIndexKey().equals(pressurePumpStart)) {
equipmentSpecificAlarms.addAll(createIndexAlarmRecord(equipmentSpeIndex, messageBodyMap)); equipmentSpecificAlarms.addAll(createIndexAlarmRecord(equipmentSpeIndex, messageBodyMap));
} }
// 遥测遥信数据推送云端kafka // 遥测遥信数据推送云端kafka
...@@ -622,6 +623,16 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -622,6 +623,16 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override @Override
public void afterCommit() { public void afterCommit() {
iotDatalist.forEach(iotDataVO -> {
String indexKey = iotDataVO.getKey();
String indexValue = iotDataVO.getValue().toString();
// 稳压泵启停信号处理
if (indexKey.equals(pressurePumpStart)) {
pressurePump(indexKey, indexValue, iotDatalist, topicEntity);
}
});
equipmentSpecificAlarms.forEach(action -> { equipmentSpecificAlarms.forEach(action -> {
if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) { if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) {
alarmLogs.add(addEquipAlarmLogRecord(action)); alarmLogs.add(addEquipAlarmLogRecord(action));
...@@ -781,6 +792,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -781,6 +792,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
String iotTopic = "influxdb/" + endIndex; String iotTopic = "influxdb/" + endIndex;
JSONObject msg = new JSONObject(); JSONObject msg = new JSONObject();
msg.put(equipmentSpeIndex.getEquipmentIndexKey(), value); msg.put(equipmentSpeIndex.getEquipmentIndexKey(), value);
msg.put("traceId", equipmentSpeIndex.getId() + "");
mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg)); mqttSendGateway.sendToMqtt(iotTopic, JSON.toJSONString(msg));
List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode); List<EquipmentSpecificVo> eqIotCodeList = iEquipmentSpecificSerivce.getEquipAndCarIotcodeByIotcode(iotCode);
...@@ -839,7 +851,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -839,7 +851,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
saveEquipmentAlarmReportDay(equipmentSpeIndex, alarmFlag); saveEquipmentAlarmReportDay(equipmentSpeIndex, alarmFlag);
// 指标告警处理 // 指标告警处理
if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm()) { if (equipmentSpeIndex.getIsAlarm() != null && 1 == equipmentSpeIndex.getIsAlarm() && !equipmentSpeIndex.getEquipmentIndexKey().equals(pressurePumpStart)) {
equipmentSpecificAlarms.addAll(createIndexAlarmRecord(equipmentSpeIndex, messageBodyMap)); equipmentSpecificAlarms.addAll(createIndexAlarmRecord(equipmentSpeIndex, messageBodyMap));
} }
// 遥测遥信数据推送云端kafka // 遥测遥信数据推送云端kafka
...@@ -900,6 +912,16 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -900,6 +912,16 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override @Override
public void afterCommit() { public void afterCommit() {
iotDatalist.forEach(iotDataVO -> {
String indexKey = iotDataVO.getKey();
String indexValue = iotDataVO.getValue().toString();
// 稳压泵启停信号处理
if (indexKey.equals(pressurePumpStart)) {
pressurePump(indexKey, indexValue, iotDatalist, topicEntity);
}
});
equipmentSpecificAlarms.forEach(action -> { equipmentSpecificAlarms.forEach(action -> {
if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) { if (AlarmStatusEnum.BJ.getCode() == action.getStatus()) {
alarmLogs.add(addEquipAlarmLogRecord(action)); alarmLogs.add(addEquipAlarmLogRecord(action));
...@@ -2460,6 +2482,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -2460,6 +2482,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
// } // }
private void pressurePump(String indexKey, String indexValue, List<IotDataVO> iotDatalist, TopicEntityVo topicEntity) { private void pressurePump(String indexKey, String indexValue, List<IotDataVO> iotDatalist, TopicEntityVo topicEntity) {
log.info("开始处理稳压泵逻辑:{}值:{}", indexKey, indexValue);
List<String> listIndex = new ArrayList<>(); List<String> listIndex = new ArrayList<>();
listIndex.add(pressurePumpStart); listIndex.add(pressurePumpStart);
// 获取全部启停泵信号 // 获取全部启停泵信号
...@@ -2487,6 +2510,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -2487,6 +2510,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
PressurePumpValueEnum valueEnum = PressurePumpValueEnum.getByCode(pressurePumpEnum.getCompareValue()); PressurePumpValueEnum valueEnum = PressurePumpValueEnum.getByCode(pressurePumpEnum.getCompareValue());
assert valueEnum != null; assert valueEnum != null;
EquipmentSpecificIndex data = getPressurePumpDateByType(indexKey, valueEnum, topicEntity, equipmentSpeIndexList, pressurePumpEnum); EquipmentSpecificIndex data = getPressurePumpDateByType(indexKey, valueEnum, topicEntity, equipmentSpeIndexList, pressurePumpEnum);
log.info("稳压泵获取{}, 值为{}", valueEnum.getDescribe(), data);
Date newDate = new Date(); Date newDate = new Date();
// 2. 校验 // 2. 校验
if (!ObjectUtils.isEmpty(data.getUpdateDate())) { if (!ObjectUtils.isEmpty(data.getUpdateDate())) {
...@@ -2513,7 +2537,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -2513,7 +2537,6 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
} else { } else {
throw new BadRequest("装备物联编码错误,请确认!"); throw new BadRequest("装备物联编码错误,请确认!");
} }
switch (valueEnum) { switch (valueEnum) {
case LAST_STOP: case LAST_STOP:
List<EquipmentSpecificIndex> lastStop = equipmentSpeIndexList.stream().filter(e -> List<EquipmentSpecificIndex> lastStop = equipmentSpeIndexList.stream().filter(e ->
...@@ -2613,6 +2636,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService { ...@@ -2613,6 +2636,7 @@ public class MqttReceiveServiceImpl implements MqttReceiveService {
assert pumpCheckEnum != null; assert pumpCheckEnum != null;
String leftValue = pressurePumpEnum.getLeftValue(); String leftValue = pressurePumpEnum.getLeftValue();
String rightValue = pressurePumpEnum.getRightValue(); String rightValue = pressurePumpEnum.getRightValue();
log.info("检验方式:{},大于:{},小于:{}, 间隔:{}", pumpCheckEnum.getDescribe(), leftValue, rightValue, diff);
switch (pumpCheckEnum) { switch (pumpCheckEnum) {
case LE: case LE:
if (StringUtil.isNotEmpty(rightValue)) { if (StringUtil.isNotEmpty(rightValue)) {
......
...@@ -2,6 +2,9 @@ package com.yeejoin.equipmanage.service.impl; ...@@ -2,6 +2,9 @@ package com.yeejoin.equipmanage.service.impl;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.yeejoin.amos.boot.biz.common.utils.DateUtils; import com.yeejoin.amos.boot.biz.common.utils.DateUtils;
import com.yeejoin.amos.boot.biz.common.utils.RedisUtils; import com.yeejoin.amos.boot.biz.common.utils.RedisUtils;
import com.yeejoin.equipmanage.common.dto.OrgUsrDto; import com.yeejoin.equipmanage.common.dto.OrgUsrDto;
...@@ -136,8 +139,8 @@ public class PressurePumpServiceImpl implements IPressurePumpService { ...@@ -136,8 +139,8 @@ public class PressurePumpServiceImpl implements IPressurePumpService {
String time = split.length > 0 ? (split[split.length - 1]) : ""; String time = split.length > 0 ? (split[split.length - 1]) : "";
try { try {
Date date = DateUtils.convertStrToDate(time, DateUtils.DATE_PATTERN); Date date = DateUtils.convertStrToDate(time, DateUtils.DATE_PATTERN);
// 结束日期不包含今天,获取3天前数据 // 结束日期不包含今天,获取3天前数据 改为统计近三天 包含当天
if (DateUtils.dateCompare(date, startDate) >= 0 && DateUtils.dateCompare(endDate, date) > 0) { if (DateUtils.dateCompare(date, startDate) >= 0 && DateUtils.dateCompare(endDate, date) >= 0) {
list.add(JSON.parseObject(redisUtils.get(x).toString(), PressurePumpCountVo.class)); list.add(JSON.parseObject(redisUtils.get(x).toString(), PressurePumpCountVo.class));
} }
} catch (ParseException e) { } catch (ParseException e) {
......
...@@ -29,6 +29,7 @@ import org.springframework.beans.factory.annotation.Value; ...@@ -29,6 +29,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.typroject.tyboot.core.foundation.utils.DateTimeUtil;
import org.typroject.tyboot.core.restful.exception.instance.BadRequest; import org.typroject.tyboot.core.restful.exception.instance.BadRequest;
import org.typroject.tyboot.core.restful.utils.ResponseModel; import org.typroject.tyboot.core.restful.utils.ResponseModel;
...@@ -40,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger; ...@@ -40,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.typroject.tyboot.core.foundation.context.RequestContext.*; import static org.typroject.tyboot.core.foundation.context.RequestContext.*;
import static org.typroject.tyboot.core.foundation.utils.DateTimeUtil.ISO8601_DATE_HOUR_MIN_SEC;
@Slf4j @Slf4j
...@@ -49,7 +51,7 @@ public class SupervisionVideoServiceImpl extends ServiceImpl<SupervisionVideoMap ...@@ -49,7 +51,7 @@ public class SupervisionVideoServiceImpl extends ServiceImpl<SupervisionVideoMap
@Autowired @Autowired
SupervisionVideoMapper supervisionVideoMapper; SupervisionVideoMapper supervisionVideoMapper;
@Autowired @Autowired
private IVideoService videoService; private IVideoService videoService;
@Autowired @Autowired
...@@ -129,7 +131,7 @@ public class SupervisionVideoServiceImpl extends ServiceImpl<SupervisionVideoMap ...@@ -129,7 +131,7 @@ public class SupervisionVideoServiceImpl extends ServiceImpl<SupervisionVideoMap
// 近3天启动平均值,四合五入 // 近3天启动平均值,四合五入
AtomicInteger dayAvgFrequency = new AtomicInteger(); AtomicInteger dayAvgFrequency = new AtomicInteger();
try { try {
Date startDate = com.yeejoin.amos.boot.biz.common.utils.DateUtils.dateAddDays(dateNow, Integer.parseInt(PressurePumpRelateEnum.DAY_AVG.getValue())); Date startDate = com.yeejoin.amos.boot.biz.common.utils.DateUtils.dateAddDays(dateNow, Integer.parseInt(PressurePumpRelateEnum.DAY_AVG.getValue()) + 1);
String startTime = String.join(" ", com.yeejoin.amos.boot.biz.common.utils.DateUtils.dateFormat(startDate, com.yeejoin.amos.boot.biz.common.utils.DateUtils.DATE_PATTERN), "00:00:00"); String startTime = String.join(" ", com.yeejoin.amos.boot.biz.common.utils.DateUtils.dateFormat(startDate, com.yeejoin.amos.boot.biz.common.utils.DateUtils.DATE_PATTERN), "00:00:00");
String endTime = String.join(" ", com.yeejoin.amos.boot.biz.common.utils.DateUtils.dateFormat(dateNow, com.yeejoin.amos.boot.biz.common.utils.DateUtils.DATE_PATTERN), "23:59:59"); String endTime = String.join(" ", com.yeejoin.amos.boot.biz.common.utils.DateUtils.dateFormat(dateNow, com.yeejoin.amos.boot.biz.common.utils.DateUtils.DATE_PATTERN), "23:59:59");
Map<String, List<PressurePumpCountVo>> dayAvgDataMap = pressurePumpService.getDateRangeCountList(pumpInfoList, startTime,endTime, PressurePumpRelateEnum.PRESSURE_PUMP.getValue(), countRedisKey, equipmentCode, pressurePumpStart, countExpire, bizOrgCode); Map<String, List<PressurePumpCountVo>> dayAvgDataMap = pressurePumpService.getDateRangeCountList(pumpInfoList, startTime,endTime, PressurePumpRelateEnum.PRESSURE_PUMP.getValue(), countRedisKey, equipmentCode, pressurePumpStart, countExpire, bizOrgCode);
...@@ -344,7 +346,11 @@ public class SupervisionVideoServiceImpl extends ServiceImpl<SupervisionVideoMap ...@@ -344,7 +346,11 @@ public class SupervisionVideoServiceImpl extends ServiceImpl<SupervisionVideoMap
Map<String, String> map = new HashMap<>(); Map<String, String> map = new HashMap<>();
map.put("id", UUID.randomUUID().toString()); map.put("id", UUID.randomUUID().toString());
map.put("name", String.valueOf(item.get("name"))); map.put("name", String.valueOf(item.get("name")));
map.put("time", String.valueOf(x.get("time")).substring(0, 19).replace("T", " ")); try {
map.put("time", change(String.valueOf(x.get("time"))));
} catch (ParseException e) {
throw new RuntimeException(e);
}
map.put("value", String.valueOf(x.get(pressurePumpStart))); map.put("value", String.valueOf(x.get(pressurePumpStart)));
finalResList.add(map); finalResList.add(map);
}); });
...@@ -386,4 +392,17 @@ public class SupervisionVideoServiceImpl extends ServiceImpl<SupervisionVideoMap ...@@ -386,4 +392,17 @@ public class SupervisionVideoServiceImpl extends ServiceImpl<SupervisionVideoMap
public List<Map<String, Object>> selectAllPressureName(String bizOrgCode) { public List<Map<String, Object>> selectAllPressureName(String bizOrgCode) {
return fireFightingSystemMapper.selectAllPressureName(bizOrgCode); return fireFightingSystemMapper.selectAllPressureName(bizOrgCode);
} }
private String change(String time) throws ParseException {
Date date1 = null;
try {
String strDate = time.substring(0, 19);
SimpleDateFormat sdf = new SimpleDateFormat(ISO8601_DATE_HOUR_MIN_SEC);
sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
date1 = sdf.parse(strDate);
} catch (ParseException e) {
e.printStackTrace();
}
return DateTimeUtil.format(date1, DateTimeUtil.ISO_DATE_HOUR24_MIN_SEC);
}
} }
...@@ -1141,7 +1141,9 @@ ...@@ -1141,7 +1141,9 @@
<changeSet author="keyong" id="168623599"> <changeSet author="keyong" id="168623599">
<preConditions onFail="MARK_RAN"> <preConditions onFail="MARK_RAN">
<tableExists tableName="wl_car" /> <not>
<columnExists tableName="wl_car" columnName="max_speed"/>
</not>
</preConditions> </preConditions>
<comment>新增属性数据</comment> <comment>新增属性数据</comment>
<sql> <sql>
......
...@@ -51,7 +51,7 @@ ...@@ -51,7 +51,7 @@
( 0 <![CDATA[<>]]> find_in_set( `fs`.`id`, `wesa`.`system_ids` ) ( 0 <![CDATA[<>]]> find_in_set( `fs`.`id`, `wesa`.`system_ids` )
AND `wesa`.`status` = 1)) > 0 AND `wesa`.`status` = 1)) > 0
), ),
'异常', '告警',
'正常' '正常'
) AS `status` ) AS `status`
FROM FROM
...@@ -246,38 +246,31 @@ ...@@ -246,38 +246,31 @@
wel.warehouse_structure_id ,wel.equipment_code wel.warehouse_structure_id ,wel.equipment_code
</select> </select>
<select id="selectFireMonitor" resultType="java.util.Map"> <select id="selectFireMonitor" resultType="java.util.Map">
SELECT temp.name, temp.alarm ,temp.id, temp.code, SELECT
IFNULL((SELECT wesi.value FROM wl_equipment_specific_index wesi WHERE wesi.equipment_specific_id = temp.id AND wesi.equipment_index_key = 'CAFS_CAFSFireGun_FireGunPressure' temp.name,
ORDER BY wesi.update_date desc LIMIT 1),0) AS value, temp.alarm,
IFNULL((SELECT CASE WHEN wesi.value_label IS NULL OR trim( wesi.value_label ) = '' THEN (CASE temp.id,
wesi.`value` temp.code,
WHEN 'true' THEN IFNULL((SELECT wesi.value FROM wl_equipment_specific_index wesi WHERE wesi.equipment_specific_id = temp.id AND wesi.equipment_index_key = 'CAFS_CAFSFireGun_FireGunPressure'
concat( wesi.equipment_index_name, '(是)' ) ORDER BY wesi.update_date desc LIMIT 1), 0) AS value,
WHEN 'false' THEN IFNULL((SELECT CASE WHEN wesi.`value` = 'true' THEN wesi.equipment_index_name ELSE '关到位' END FROM wl_equipment_specific_index wesi
concat( wesi.equipment_index_name, '(否)' ) WHERE wesi.equipment_specific_id = temp.id AND wesi.equipment_index_key = 'CAFS_GunValve_Open' ORDER BY wesi.update_date DESC LIMIT 1),'关到位') AS status,
ELSE IFNULL((SELECT wesi.`value` FROM wl_equipment_specific_index wesi WHERE wesi.equipment_specific_id = temp.id AND wesi.equipment_index_key = 'CAFS_GunValve_Flow' ORDER BY wesi.update_date DESC LIMIT 1), 0) AS flow
wesi.equipment_index_name FROM
END) ELSE wesi.value_label END FROM wl_equipment_specific_index wesi (
LEFT JOIN wl_equipment_index wei on wesi.equipment_index_id = wei.id
WHERE wesi.equipment_specific_id = temp.id AND wei.is_trend = 0
AND wesi.value is NOT NULL AND wesi.value <![CDATA[<>]]> ''
ORDER BY wesi.update_date desc LIMIT 1),'--') AS status,
'0' AS flow
FROM(
SELECT wes.id,wes.name,wes.code, SELECT wes.id,wes.name,wes.code,
CASE CASE WHEN (SELECT is_alarm FROM wl_equipment_specific_index wesi WHERE wesi.equipment_specific_id = wes.id AND wesi.equipment_index_key = wes.realtime_iot_index_key ) = 1
WHEN ( SELECT is_alarm FROM wl_equipment_specific_index wesi WHERE wesi.equipment_specific_id = wes.id AND wesi.equipment_index_key = wes.realtime_iot_index_key ) = 1 AND wes.realtime_iot_index_value = 'true' THEN
AND wes.realtime_iot_index_value = 'true' THEN 1 ELSE 0
1 ELSE 0 END AS alarm
END AS alarm
FROM wl_equipment_specific wes FROM wl_equipment_specific wes
JOIN f_equipment_fire_equipment fire ON wes.id = fire.fire_equipment_id JOIN f_equipment_fire_equipment fire ON wes.id = fire.fire_equipment_id
WHERE WHERE
wes.equipment_code LIKE concat( '920322', '%' ) wes.equipment_code LIKE concat( '920322', '%' )
<if test="code != null and code!='' "> <if test="code != null and code!='' ">
AND fire.equipment_id = (SELECT id FROM f_equipment WHERE `code` = #{code}) AND fire.equipment_id = (SELECT id FROM f_equipment WHERE `code` = #{code})
</if> </if>
AND wes.biz_org_code LIKE CONCAT( #{bizOrgCode}, '%' ) AND wes.biz_org_code LIKE CONCAT( #{bizOrgCode}, '%' )
) temp ) temp
</select> </select>
<select id="selectFoamTank" resultType="java.util.Map"> <select id="selectFoamTank" resultType="java.util.Map">
...@@ -442,6 +435,102 @@ ...@@ -442,6 +435,102 @@
) )
</select> </select>
<select id="selectOilDrainageNew" resultType="java.util.Map">
SELECT
wes.id,
'3' AS type,
(
SELECT
IF
(
wesi.`value` = 'true'
AND wesi.equipment_index_key = 'ONL_DrainOilValve_Open',
( SELECT emergency_level FROM wl_equipment_index WHERE name_key = 'ONL_DrainOilValve_Open' ),
( SELECT emergency_level FROM wl_equipment_index WHERE name_key = 'ONL_DrainOilValve_Close' ))
FROM
wl_equipment_specific_index wesi
WHERE
wesi.equipment_specific_id = wes.id
AND wesi.equipment_index_key IN ( 'ONL_DrainOilValve_Open', 'ONL_DrainOilValve_Close' )
AND wesi.`value` = 'true'
ORDER BY
wesi.update_date DESC
LIMIT 1
) AS level,
wes.CODE AS code,
wes.name,
(
SELECT
IF
( wesi.`value` = 'true' AND wesi.equipment_index_key = 'ONL_DrainOilValve_Open', '全开', '全关' )
FROM
wl_equipment_specific_index wesi
WHERE
wesi.equipment_specific_id = wes.id
AND wesi.equipment_index_key IN ( 'ONL_DrainOilValve_Open', 'ONL_DrainOilValve_Close' )
AND wesi.`value` = 'true'
ORDER BY
wesi.update_date DESC
LIMIT 1
) AS status
FROM
wl_equipment_specific wes
JOIN f_equipment_fire_equipment fire ON wes.id = fire.fire_equipment_id
WHERE
wes.equipment_code LIKE concat( '921004', '%' )
<if test="code != null and code!='' ">
AND fire.equipment_id = (SELECT id FROM f_equipment WHERE `code` = #{code})
</if>
<if test="code != null and code!='' ">
UNION ALL
(
SELECT
wes.id,
'3' AS type,
(
SELECT
IF
(
wesi.`value` = 'true'
AND wesi.equipment_index_key = 'ONL_DrainOilValve_Open',
( SELECT emergency_level FROM wl_equipment_index WHERE name_key = 'ONL_DrainOilValve_Open' ),
( SELECT emergency_level FROM wl_equipment_index WHERE name_key = 'ONL_DrainOilValve_Close' ))
FROM
wl_equipment_specific_index wesi
WHERE
wesi.equipment_specific_id = wes.id
AND wesi.equipment_index_key IN ( 'ONL_DrainOilValve_Open', 'ONL_DrainOilValve_Close' )
AND wesi.`value` = 'true'
ORDER BY
wesi.update_date DESC
LIMIT 1
) AS level,
wes.CODE AS code,
wes.name,
(
SELECT
IF
( wesi.`value` = 'true' AND wesi.equipment_index_key = 'ONL_DrainOilValve_Open', '全开', '全关' )
FROM
wl_equipment_specific_index wesi
WHERE
wesi.equipment_specific_id = wes.id
AND wesi.equipment_index_key IN ( 'ONL_DrainOilValve_Open', 'ONL_DrainOilValve_Close' )
AND wesi.`value` = 'true'
ORDER BY
wesi.update_date DESC
LIMIT 1
) AS status
FROM
wl_equipment_specific wes
JOIN f_equipment_fire_equipment fire ON wes.id = fire.fire_equipment_id
WHERE
wes.equipment_code LIKE concat( '921004', '%' )
AND fire.equipment_id != (SELECT id FROM f_equipment WHERE `code` = #{code})
)
</if>
</select>
<select id="selectGasExtinguishing" resultType="java.util.Map"> <select id="selectGasExtinguishing" resultType="java.util.Map">
SELECT SELECT
wes.id, wes.id,
......
...@@ -6443,10 +6443,10 @@ ...@@ -6443,10 +6443,10 @@
SELECT SELECT
IFNULL(SUM(IF(r.`status` = 0, 1, 0)), 0) AS normalNum, IFNULL(SUM(IF(r.`status` = 0, 1, 0)), 0) AS normalNum,
IFNULL(SUM(IF((r.type = 'BREAKDOWN' AND r.`status` = 1), 1, 0)), 0) AS faultNum, IFNULL(SUM(IF((r.type = 'BREAKDOWN' AND r.`status` = 1), 1, 0)), 0) AS faultNum,
IFNULL(SUM(IF((r.type = 'FIREALARM' AND r.`status` = 1), 1, 0)), 0) AS alarmNum, IFNULL(SUM(IF((r.type != 'BREAKDOWN' AND r.`status` = 1), 1, 0)), 0) AS alarmNum,
DATE_FORMAT( r.update_date, '%Y-%m-%d') AS date DATE_FORMAT( r.update_date, '%Y-%m-%d') AS date
FROM FROM
wl_equipment_specific_alarm_log r wl_equipment_specific_alarm r
LEFT JOIN f_fire_fighting_system fs ON FIND_IN_SET( fs.id, r.system_ids ) LEFT JOIN f_fire_fighting_system fs ON FIND_IN_SET( fs.id, r.system_ids )
<where> <where>
r.update_date BETWEEN date_sub( now(), INTERVAL 6 MONTH ) AND now() r.update_date BETWEEN date_sub( now(), INTERVAL 6 MONTH ) AND now()
...@@ -6455,6 +6455,7 @@ ...@@ -6455,6 +6455,7 @@
</if> </if>
</where> </where>
GROUP BY GROUP BY
r.equipment_specific_id,
LEFT ( r.update_date, 10 ) LEFT ( r.update_date, 10 )
ORDER BY ORDER BY
r.update_date r.update_date
...@@ -6491,10 +6492,10 @@ ...@@ -6491,10 +6492,10 @@
SELECT SELECT
IFNULL(SUM(IF(r.`status` = 0, 1, 0)), 0) AS normalNum, IFNULL(SUM(IF(r.`status` = 0, 1, 0)), 0) AS normalNum,
IFNULL(SUM(IF((r.type = 'BREAKDOWN' AND r.`status` = 1), 1, 0)), 0) AS faultNum, IFNULL(SUM(IF((r.type = 'BREAKDOWN' AND r.`status` = 1), 1, 0)), 0) AS faultNum,
IFNULL(SUM(IF((r.type = 'FIREALARM' AND r.`status` = 1), 1, 0)), 0) AS alarmNum, IFNULL(SUM(IF((r.type != 'BREAKDOWN' AND r.`status` = 1), 1, 0)), 0) AS alarmNum,
DATE_FORMAT( r.update_date, '%Y-%m-%d') AS date DATE_FORMAT( r.update_date, '%Y-%m-%d') AS date
FROM FROM
wl_equipment_specific_alarm_log r wl_equipment_specific_alarm r
LEFT JOIN f_fire_fighting_system fs ON FIND_IN_SET( fs.id, r.system_ids ) LEFT JOIN f_fire_fighting_system fs ON FIND_IN_SET( fs.id, r.system_ids )
<where> <where>
r.update_date BETWEEN date_sub( now(), INTERVAL 15 MONTH ) AND now() r.update_date BETWEEN date_sub( now(), INTERVAL 15 MONTH ) AND now()
...@@ -6503,6 +6504,7 @@ ...@@ -6503,6 +6504,7 @@
</if> </if>
</where> </where>
GROUP BY GROUP BY
r.equipment_specific_id,
LEFT ( r.update_date, 10 ) LEFT ( r.update_date, 10 )
ORDER BY ORDER BY
r.update_date r.update_date
......
...@@ -52,6 +52,17 @@ ...@@ -52,6 +52,17 @@
</exclusion> </exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<!-- 读取excel文件 -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>5.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>5.2.3</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
...@@ -5,20 +5,31 @@ import com.yeejoin.amos.message.utils.ClassToJsonUtil; ...@@ -5,20 +5,31 @@ import com.yeejoin.amos.message.utils.ClassToJsonUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONObject; import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.poi.ss.usermodel.*;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.typroject.tyboot.component.emq.EmqKeeper; import org.typroject.tyboot.component.emq.EmqKeeper;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import static com.yeejoin.amos.message.kafka.Constant.*;
/** /**
* kafka 消费服务 * kafka 消费服务
...@@ -28,7 +39,7 @@ import static com.yeejoin.amos.message.kafka.Constant.*; ...@@ -28,7 +39,7 @@ import static com.yeejoin.amos.message.kafka.Constant.*;
**/ **/
@Slf4j @Slf4j
@Service @Service
public class KafkaConsumerService { public class KafkaConsumerService implements ApplicationRunner {
private static final String MQTT_TOPIC = "romaSite/data/transmit"; private static final String MQTT_TOPIC = "romaSite/data/transmit";
private static final String PROVINCE_MQTT_TOPIC = "province/data/transport"; private static final String PROVINCE_MQTT_TOPIC = "province/data/transport";
...@@ -40,67 +51,20 @@ public class KafkaConsumerService { ...@@ -40,67 +51,20 @@ public class KafkaConsumerService {
@Value("classpath:/json/commonMessage.json") @Value("classpath:/json/commonMessage.json")
private Resource commonMessage; private Resource commonMessage;
// /** /**
// * 批量消费kafka消息 * execl文件路径,读取excel
// * Kafka消息转emq */
// * // @Value("${filter.excel.path:F:\\filterExcel11.xlsx}")
// * @param consumerRecords messages @Value("${filter.excel.path:}")
// * @param ack ack private String filePath;
// */
// @KafkaListener(id = "consumerSingle", groupId = "zhTestGroup", topics = "#{'${kafka.topics}'.split(',')}")
// public void listen1(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
// try {
// for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
// if (kafkaMessage.isPresent()) {
// JSONObject messageObj = JSONObject.fromObject(kafkaMessage.get());
// if (messageObj.has(TOPIC)) {
// emqKeeper.getMqttClient().publish(messageObj.optString(TOPIC), messageObj.getJSONObject(DATA).toString()
// .getBytes(StandardCharsets.UTF_8), 0, false);
// }
// log.info("kafka消费zhTestGroup消息{}", messageObj);
// }
// }
// } catch (Exception e) {
// log.error("kafka失败,当前失败的批次: data:{}, {}", consumerRecords, e);
// } finally {
// ack.acknowledge();
// }
// }
/** /**
* 批量消费kafka消息 * 服务启动时,内存存储execl文档中需要的编码信息
* 监听数据表变化kafka数据转发emq
*
* @param consumerRecords messages
* @param ack ack
*/ */
// @KafkaListener(id = "provinceMessage", groupId = "province", topics = "#{'${kafka.topics}'.split(',')}") private List<String> codeListInfo;
// public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {
// try { private static final String topic1 = "romaSite/data/transmit";
// if (isZxj) { private static final String topic2 = "romaSite/data/eventAlarm";
// for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
// if (kafkaMessage.isPresent()) {
// JSONObject messageObj = JSONObject.fromObject(kafkaMessage.get());
// String type = messageObj.optString(TYPE);
// String table = messageObj.optString(TABLE);
// if (Arrays.asList(INSERT, UPDATE).contains(type) && DBTableTypeEnum.have(table) != null) {
// JSONObject data = (JSONObject) messageObj.getJSONArray(DATA).get(0);
// data.put(DB_TYPE, type);
// data.put(TABLE, table);
// emqKeeper.getMqttClient().publish(PROVINCE_MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
// log.info("kafka消费province消息{}", messageObj);
// }
// }
// }
// }
// } catch (Exception e) {
// log.error("kafka失败,当前失败的批次: data:{}, {}", consumerRecords, e);
// } finally {
// ack.acknowledge();
// }
// }
/** /**
* 转发苏州,绍兴换流站Kafka数据对emq * 转发苏州,绍兴换流站Kafka数据对emq
...@@ -113,17 +77,11 @@ public class KafkaConsumerService { ...@@ -113,17 +77,11 @@ public class KafkaConsumerService {
try { try {
Optional<?> messages = Optional.ofNullable(record.value()); Optional<?> messages = Optional.ofNullable(record.value());
if (messages.isPresent()) { if (messages.isPresent()) {
// JSONObject messageObj = JSONObject.fromObject(record.value());
// if (messageObj.getJSONObject(BODY).isEmpty()) {
// messageObj.put(DATA_TYPE, STATE);
// }
// JSONObject object = JSONObject.fromObject(record.value());
// String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC);
// emqKeeper.getMqttClient().publish(MQTT_TOPIC, json.getBytes(StandardCharsets.UTF_8), 0, false);
JSONObject object = JSONObject.fromObject(record.value()); JSONObject object = JSONObject.fromObject(record.value());
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic()); com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false); if ((StringUtils.isEmpty(filePath)) || (CollectionUtils.isEmpty(codeListInfo)) || (!ObjectUtils.isEmpty(jsonObj) && Boolean.TRUE.equals(isSendEmq(jsonObj)))) {
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
}
} }
} catch (MqttException e) { } catch (MqttException e) {
log.error("换流站转发Kafka消息失败" + e.getMessage(), e); log.error("换流站转发Kafka消息失败" + e.getMessage(), e);
...@@ -139,30 +97,11 @@ public class KafkaConsumerService { ...@@ -139,30 +97,11 @@ public class KafkaConsumerService {
Optional<?> message = Optional.ofNullable(record.value()); Optional<?> message = Optional.ofNullable(record.value());
if (message.isPresent()) { if (message.isPresent()) {
try { try {
// JSONObject messageObj = JSONObject.fromObject(record.value());
// JSONArray dataArray = messageObj.getJSONArray("data");
// JSONArray jsonArray = new JSONArray();
// String timestamp = "";
// for (Object obj : dataArray) {
// JSONObject finallyObj = new JSONObject();
// com.alibaba.fastjson.JSONObject detail = com.alibaba.fastjson.JSONObject.parseObject(com.alibaba.fastjson.JSONObject.toJSONString(obj));
// finallyObj.put("eventtextL1", detail.get("description"));
// finallyObj.put("pointId", detail.get("astId"));
// finallyObj.put("time", detail.get("dateTime"));
// jsonArray.add(finallyObj);
// timestamp = detail.get("dateTime").toString();
// }
// JSONObject jsonObjectMessage = new JSONObject();
// jsonObjectMessage.put("warns", jsonArray);
// jsonObjectMessage.put("timestamp", timestamp);
// JSONObject object = JSONObject.fromObject(record.value());
// String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC_EVENT_ALARM);
// emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, json.getBytes(StandardCharsets.UTF_8), 0, false);
JSONObject object = JSONObject.fromObject(record.value()); JSONObject object = JSONObject.fromObject(record.value());
com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic()); com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false); if ((StringUtils.isEmpty(filePath)) || (CollectionUtils.isEmpty(codeListInfo)) || (!ObjectUtils.isEmpty(jsonObj) && Boolean.TRUE.equals(isSendEmq(jsonObj)))) {
emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
}
ack.acknowledge(); ack.acknowledge();
} catch (MqttException e) { } catch (MqttException e) {
log.error("解析数据失败,{}", e.getMessage()); log.error("解析数据失败,{}", e.getMessage());
...@@ -171,78 +110,6 @@ public class KafkaConsumerService { ...@@ -171,78 +110,6 @@ public class KafkaConsumerService {
} }
} }
} }
//
// /**
// * 韶山换流对接Kafka
// * @param record record
// * @param ack ack
// */
// @KafkaListener(id = "kafkaConsumer", groupId = "kafkaConsumerGroup", topics = "#{'${queue.kafka.shaoshan.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
// public void kafkaConsumer(ConsumerRecord<?, String> record, Acknowledgment ack) {
// Optional<?> message = Optional.ofNullable(record.value());
// if (message.isPresent()) {
// try {
//// JSONObject messageObj = JSONObject.fromObject(record.value());
//// JSONObject data = messageObj.getJSONObject("body");
//// JSONObject object = JSONObject.fromObject(record.value());
//// String json = ClassToJsonUtil.class2json(object, commonMessage, MQTT_TOPIC_EVENT_ALARM);
//// emqKeeper.getMqttClient().publish(MQTT_TOPIC_EVENT_ALARM, json.getBytes(StandardCharsets.UTF_8), 0, false);
//
// JSONObject object = JSONObject.fromObject(record.value());
// com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
// emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
// ack.acknowledge();
// } catch (MqttException e) {
// log.error("解析数据失败,{}", e.getMessage());
// } catch (UnsupportedEncodingException e) {
// e.printStackTrace();
// }
// }
// }
//
// /**
// * 事件告警对接Kafka
// * @param record record
// * @param ack ack
// * groupId = kafkaConsumerGroup
// * 该消息的消息格式为
// * {"data_class":"realdata","data_type":"alarm","op_type":"subscribe_emergency","condition":{"station_psr_id":"50edcb6c1b8a811030493c80a2014950ed9d4f59e8","station_name":"中州换流站","alarm_type":"yx_bw"},"data":[{"psrId":"D017020000000000000000999","astId":"D017020000000000000000999","equipType":"ASTType_0000111","eventType":"OtherSignal","alarmSource":"OWS","alarmLevel":"3","description":"2024-03-11 09:06:17::585 S2WCL12A E3.C01软水器再生结束信号 出现","dateTime":"2024-03-11 09:06:17.585"}]}
// */
//
// @KafkaListener(id = "kafkaConsumerEventAlarm", groupId = "kafkaConsumerGroupEventAlarm", topics = "#{'${queue.kafka.eventAlarm.topics}'.split(',')}", containerFactory = "kafkaRomaContainerFactory")
// public void kafkaConsumerEventAlarm(ConsumerRecord<?, String> record, Acknowledgment ack) {
// Optional<?> message = Optional.ofNullable(record.value());
// if (message.isPresent()) {
// try {
//// JSONObject messageObj = JSONObject.fromObject(record.value());
//// JSONArray dataArray = messageObj.getJSONArray("data");
//// JSONArray jsonArray = new JSONArray();
//// String timestamp = "";
//// for (Object obj : dataArray) {
//// JSONObject finallyObj = new JSONObject();
//// com.alibaba.fastjson.JSONObject detail = com.alibaba.fastjson.JSONObject.parseObject(com.alibaba.fastjson.JSONObject.toJSONString(obj));
//// finallyObj.put("eventtextL1", detail.get("description"));
//// finallyObj.put("pointId", detail.get("astId"));
//// finallyObj.put("time", detail.get("dateTime"));
//// jsonArray.add(finallyObj);
//// timestamp = detail.get("dateTime").toString();
//// }
//// JSONObject jsonObjectMessage = new JSONObject();
//// jsonObjectMessage.put("warns", jsonArray);
//// jsonObjectMessage.put("timestamp", timestamp);
//
// JSONObject object = JSONObject.fromObject(record.value());
// com.alibaba.fastjson.JSONObject jsonObj = ClassToJsonUtil.class2json(object, commonMessage, record.topic());
// emqKeeper.getMqttClient().publish(String.valueOf(jsonObj.get("mqTopic")), JSON.toJSONString(jsonObj).getBytes("UTF-8"), 0, false);
// ack.acknowledge();
// } catch (MqttException e) {
// log.error("解析数据失败,{}", e.getMessage());
// } catch (UnsupportedEncodingException e) {
// e.printStackTrace();
// }
// }
// }
/** /**
* 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"} * 消费单条消息,topics 可以监听多个topic,如:topics = {"topic1", "topic2"}
...@@ -267,85 +134,75 @@ public class KafkaConsumerService { ...@@ -267,85 +134,75 @@ public class KafkaConsumerService {
} }
} }
@Override
public void run(ApplicationArguments args) {
codeListInfo = readExcelFile(filePath);
log.info("excel文件数据:{}", JSON.toJSONString(codeListInfo));
}
///** /**
// * 省级消息转发 * 判断是否发送emq消息
// * * @return true 发送 false 不发送
// * @param message 省级消息 */
// * @param ack ack private Boolean isSendEmq(com.alibaba.fastjson.JSONObject res) {
// */ String key = "";
//@KafkaListener(id = "provinceMessage", groupId = "province", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "2") if (!StringUtils.isEmpty(res.get("mqTopic")) && topic1.equals(res.get("mqTopic").toString())) {
//public void consumerSingle1(String message, Acknowledgment ack) { key = res.getJSONObject("data").get("key").toString();
// log.info("省级消息转发打印" + message); } else if (!StringUtils.isEmpty(res.get("mqTopic")) && topic2.equals(res.get("mqTopic").toString())) {
// if(isZxj) { key = res.getJSONObject("data").getJSONArray("warns").getJSONObject(0).get("pointId").toString();
// Optional<?> messages = Optional.ofNullable(message); }
// if (messages.isPresent()) { return !StringUtils.isEmpty(key) && codeListInfo.contains(key);
// try { }
// JSONObject jsonObject = JSONObject.fromObject(message);
// String type = jsonObject.optString("type");
// String table = jsonObject.optString("table");
// if (Arrays.asList(Constant.INSERT, Constant.UPDATE).contains(type) && DBTableTypeEnum.have(table) != null) {
// if (Arrays.asList("INSERT", "UPDATE").contains(type)) {
// JSONArray array = jsonObject.getJSONArray("data");
// JSONObject data = (JSONObject)array.get(0);
// data.put("dbType", type);
// data.put("table", table);
// emqKeeper.getMqttClient().publish(PROVINCE_MQTT_TOPIC, data.toString().getBytes(StandardCharsets.UTF_8), 0, false);
// log.info("省级消息: {}", data);
// }
// }
// } catch (MqttException e) {
// log.error("消息转发失败" + e.getMessage(), e);
// }
// ack.acknowledge();
// }
// }
//}
public static List<String> readExcelFile(String filePath) {
try (FileInputStream fis = new FileInputStream(new File(filePath));
Workbook workbook = new XSSFWorkbook(fis)) {
Sheet sheet = workbook.getSheetAt(0); // 获取第一个工作表
List<String> list = getColumnData(sheet);
// 在这里处理list中的数据,例如打印、存储等操作
return list;
} catch (IOException e) {
e.printStackTrace();
}
return new ArrayList<>();
}
/* @KafkaListener(id = "consumerBatch", topicPartitions = { private static List<String> getColumnData(Sheet sheet) {
@TopicPartition(topic = "hello-batch1", partitions = "0"), List<String> list = new ArrayList<>();
@TopicPartition(topic = "hello-batch2", partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "4")) Iterator<Row> rowIterator = sheet.iterator();
})*/ rowIterator.next(); // 跳过表头行
// /** while (rowIterator.hasNext()) {
// * 批量消费消息 Row row = rowIterator.next();
// * @param messages Cell cell = row.getCell(0);
// */ if (cell != null) {
// @KafkaListener(id = "consumerBatch", topics = "test-batch") String cellValue = getCellValueAsString(cell);
// public void consumerBatch(List<ConsumerRecord<String, String>> messages) { list.add(cellValue);
// log.info("consumerBatch =====> messageSize: {}", messages.size()); }
// log.info(messages.toString()); }
// } return list;
}
// /** private static String getCellValueAsString(Cell cell) {
// * 指定消费异常处理器 if (cell == null) {
// * @param message return "";
// */ }
// @KafkaListener(id = "consumerException", topics = "kafka-test-topic", errorHandler = "consumerAwareListenerErrorHandler") switch (cell.getCellType()) {
// public void consumerException(String message) { case STRING:
// throw new RuntimeException("consumer exception"); return cell.getStringCellValue();
// } case NUMERIC:
// if (DateUtil.isCellDateFormatted(cell)) {
// /** return cell.getDateCellValue().toString();
// * 验证ConsumerInterceptor } else {
// * @param message return Double.toString(cell.getNumericCellValue());
// */ }
// @KafkaListener(id = "interceptor", topics = "consumer-interceptor") case BOOLEAN:
// public void consumerInterceptor(String message) { return Boolean.toString(cell.getBooleanCellValue());
// log.info("consumerInterceptor ====> message: {}", message); case FORMULA:
// } return cell.getCellFormula();
// default:
// return "";
// }
//kafka的监听器,topic为"zhTest",消费者组为"zhTestGroup" }
//@KafkaListener(topics = "test", groupId = "zhTestGroup")
//public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
// String value = record.value();
// System.out.println(value);
// System.out.println(record);
// //手动提交offset
// ack.acknowledge();
//}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment